Refactor compose deployer into modules

This commit is contained in:
andrussal 2025-12-10 10:11:45 +01:00
parent 7a6db26fe1
commit 2d05b60c74
7 changed files with 338 additions and 222 deletions

View File

@ -0,0 +1,54 @@
use testing_framework_core::{
scenario::{BlockFeed, BlockFeedTask, NodeClients},
topology::generation::GeneratedTopology,
};
use tracing::info;
use crate::{
block_feed::spawn_block_feed_with_retry, environment::StackEnvironment,
errors::ComposeRunnerError, ports::HostPortMapping, readiness::build_node_clients_with_ports,
};
pub struct ClientBuilder;
impl ClientBuilder {
#[must_use]
pub const fn new() -> Self {
Self
}
pub async fn build_node_clients(
&self,
descriptors: &GeneratedTopology,
host_ports: &HostPortMapping,
host: &str,
environment: &mut StackEnvironment,
) -> Result<NodeClients, ComposeRunnerError> {
match build_node_clients_with_ports(descriptors, host_ports, host) {
Ok(clients) => Ok(clients),
Err(err) => {
environment
.fail("failed to construct node api clients")
.await;
Err(err.into())
}
}
}
pub async fn start_block_feed(
&self,
node_clients: &NodeClients,
environment: &mut StackEnvironment,
) -> Result<(BlockFeed, BlockFeedTask), ComposeRunnerError> {
match spawn_block_feed_with_retry(node_clients).await {
Ok(pair) => {
info!("block feed connected to validator");
Ok(pair)
}
Err(err) => {
environment.fail("failed to initialize block feed").await;
Err(err)
}
}
}
}

View File

@ -1,38 +1,15 @@
use std::{
env,
net::{Ipv4Addr, TcpListener as StdTcpListener},
sync::Arc,
};
pub mod clients;
pub mod orchestrator;
pub mod ports;
pub mod readiness;
pub mod setup;
use async_trait::async_trait;
use testing_framework_core::{
scenario::{
BlockFeed, BlockFeedTask, CleanupGuard, Deployer, NodeClients, NodeControlHandle,
RequiresNodeControl, RunContext, Runner, Scenario,
},
topology::generation::GeneratedTopology,
use testing_framework_core::scenario::{
BlockFeedTask, CleanupGuard, Deployer, RequiresNodeControl, Runner, Scenario,
};
use tracing::{debug, info};
use crate::{
block_feed::spawn_block_feed_with_retry,
cleanup::RunnerCleanup,
control::ComposeNodeControl,
docker::ensure_docker_available,
environment::{
PortReservation, StackEnvironment, ensure_supported_topology, prepare_environment,
},
errors::ComposeRunnerError,
ports::{
HostPortMapping, compose_runner_host, discover_host_ports,
ensure_remote_readiness_with_ports,
},
readiness::{
build_node_clients_with_ports, ensure_executors_ready_with_ports,
ensure_validators_ready_with_ports, maybe_sleep_for_disabled_readiness,
metrics_handle_from_port,
},
};
use crate::{cleanup::RunnerCleanup, errors::ComposeRunnerError};
/// Docker Compose-based deployer for Nomos test scenarios.
#[derive(Clone, Copy)]
@ -59,130 +36,6 @@ impl ComposeDeployer {
self.readiness_checks = enabled;
self
}
async fn prepare_ports(
&self,
environment: &mut StackEnvironment,
descriptors: &GeneratedTopology,
) -> Result<HostPortMapping, ComposeRunnerError> {
debug!("resolving host ports for compose services");
match discover_host_ports(environment, descriptors).await {
Ok(mapping) => {
info!(
validator_ports = ?mapping.validator_api_ports(),
executor_ports = ?mapping.executor_api_ports(),
prometheus_port = environment.prometheus_port(),
"resolved container host ports"
);
Ok(mapping)
}
Err(err) => {
environment
.fail("failed to determine container host ports")
.await;
Err(err)
}
}
}
async fn wait_for_readiness(
&self,
descriptors: &GeneratedTopology,
host_ports: &HostPortMapping,
environment: &mut StackEnvironment,
) -> Result<(), ComposeRunnerError> {
info!(
ports = ?host_ports.validator_api_ports(),
"waiting for validator HTTP endpoints"
);
if let Err(err) =
ensure_validators_ready_with_ports(&host_ports.validator_api_ports()).await
{
environment.fail("validator readiness failed").await;
return Err(err.into());
}
info!(
ports = ?host_ports.executor_api_ports(),
"waiting for executor HTTP endpoints"
);
if let Err(err) = ensure_executors_ready_with_ports(&host_ports.executor_api_ports()).await
{
environment.fail("executor readiness failed").await;
return Err(err.into());
}
info!("waiting for remote service readiness");
if let Err(err) = ensure_remote_readiness_with_ports(descriptors, host_ports).await {
environment.fail("remote readiness probe failed").await;
return Err(err.into());
}
Ok(())
}
async fn build_node_clients(
&self,
descriptors: &GeneratedTopology,
host_ports: &HostPortMapping,
host: &str,
environment: &mut StackEnvironment,
) -> Result<NodeClients, ComposeRunnerError> {
match build_node_clients_with_ports(descriptors, host_ports, host) {
Ok(clients) => Ok(clients),
Err(err) => {
environment
.fail("failed to construct node api clients")
.await;
Err(err.into())
}
}
}
async fn start_block_feed(
&self,
node_clients: &NodeClients,
environment: &mut StackEnvironment,
) -> Result<(BlockFeed, BlockFeedTask), ComposeRunnerError> {
match spawn_block_feed_with_retry(node_clients).await {
Ok(pair) => {
info!("block feed connected to validator");
Ok(pair)
}
Err(err) => {
environment.fail("failed to initialize block feed").await;
Err(err)
}
}
}
fn maybe_node_control<Caps>(
&self,
environment: &StackEnvironment,
) -> Option<Arc<dyn NodeControlHandle>>
where
Caps: RequiresNodeControl + Send + Sync,
{
Caps::REQUIRED.then(|| {
Arc::new(ComposeNodeControl {
compose_file: environment.compose_path().to_path_buf(),
project_name: environment.project_name().to_owned(),
}) as Arc<dyn NodeControlHandle>
})
}
}
pub(crate) const PROMETHEUS_PORT_ENV: &str = "TEST_FRAMEWORK_PROMETHEUS_PORT";
pub(crate) const DEFAULT_PROMETHEUS_PORT: u16 = 9090;
fn allocate_prometheus_port() -> Option<PortReservation> {
reserve_port(DEFAULT_PROMETHEUS_PORT).or_else(|| reserve_port(0))
}
fn reserve_port(port: u16) -> Option<PortReservation> {
let listener = StdTcpListener::bind((Ipv4Addr::LOCALHOST, port)).ok()?;
let actual_port = listener.local_addr().ok()?.port();
Some(PortReservation::new(actual_port, Some(listener)))
}
#[async_trait]
@ -193,75 +46,13 @@ where
type Error = ComposeRunnerError;
async fn deploy(&self, scenario: &Scenario<Caps>) -> Result<Runner, Self::Error> {
ensure_docker_available().await?;
let descriptors = scenario.topology().clone();
ensure_supported_topology(&descriptors)?;
info!(
validators = descriptors.validators().len(),
executors = descriptors.executors().len(),
"starting compose deployment"
);
let prometheus_env = env::var(PROMETHEUS_PORT_ENV)
.ok()
.and_then(|raw| raw.parse::<u16>().ok());
if prometheus_env.is_some() {
info!(port = prometheus_env, "using prometheus port from env");
}
let prometheus_port = prometheus_env
.and_then(|port| reserve_port(port))
.or_else(|| allocate_prometheus_port())
.unwrap_or_else(|| PortReservation::new(DEFAULT_PROMETHEUS_PORT, None));
let mut environment =
prepare_environment(&descriptors, prometheus_port, prometheus_env.is_some()).await?;
info!(
compose_file = %environment.compose_path().display(),
project = environment.project_name(),
root = %environment.root().display(),
"compose workspace prepared"
);
let host_ports = self.prepare_ports(&mut environment, &descriptors).await?;
if self.readiness_checks {
self.wait_for_readiness(&descriptors, &host_ports, &mut environment)
.await?;
} else {
info!("readiness checks disabled; giving the stack a short grace period");
maybe_sleep_for_disabled_readiness(false).await;
}
info!("compose stack ready; building node clients");
let host = compose_runner_host();
let node_clients = self
.build_node_clients(&descriptors, &host_ports, &host, &mut environment)
.await?;
let telemetry = metrics_handle_from_port(environment.prometheus_port(), &host)?;
let node_control = self.maybe_node_control::<Caps>(&environment);
let (block_feed, block_feed_guard) = self
.start_block_feed(&node_clients, &mut environment)
.await?;
let cleanup_guard: Box<dyn CleanupGuard> = Box::new(ComposeCleanupGuard::new(
environment.into_cleanup(),
block_feed_guard,
));
let context = RunContext::new(
descriptors,
None,
node_clients,
scenario.duration(),
telemetry,
block_feed,
node_control,
);
Ok(Runner::new(context, Some(cleanup_guard)))
orchestrator::DeploymentOrchestrator::new(*self)
.deploy(scenario)
.await
}
}
struct ComposeCleanupGuard {
pub(super) struct ComposeCleanupGuard {
environment: RunnerCleanup,
block_feed: Option<BlockFeedTask>,
}
@ -284,6 +75,13 @@ impl CleanupGuard for ComposeCleanupGuard {
}
}
pub(super) fn make_cleanup_guard(
environment: RunnerCleanup,
block_feed: BlockFeedTask,
) -> Box<dyn CleanupGuard> {
Box::new(ComposeCleanupGuard::new(environment, block_feed))
}
#[cfg(test)]
mod tests {
use std::{collections::HashMap, net::Ipv4Addr};

View File

@ -0,0 +1,94 @@
use std::sync::Arc;
use testing_framework_core::scenario::{
NodeControlHandle, RequiresNodeControl, RunContext, Runner, Scenario,
};
use tracing::info;
use super::{
ComposeDeployer,
clients::ClientBuilder,
make_cleanup_guard,
ports::PortManager,
readiness::ReadinessChecker,
setup::{DeploymentContext, DeploymentSetup},
};
use crate::{
control::ComposeNodeControl, errors::ComposeRunnerError, ports::compose_runner_host,
readiness::metrics_handle_from_port,
};
pub struct DeploymentOrchestrator {
deployer: ComposeDeployer,
}
impl DeploymentOrchestrator {
pub const fn new(deployer: ComposeDeployer) -> Self {
Self { deployer }
}
pub async fn deploy<Caps>(
&self,
scenario: &Scenario<Caps>,
) -> Result<Runner, ComposeRunnerError>
where
Caps: RequiresNodeControl + Send + Sync,
{
let setup = DeploymentSetup::new(scenario.topology());
setup.validate_environment().await?;
let DeploymentContext {
mut environment,
descriptors,
} = setup.prepare_workspace().await?;
let host_ports = PortManager::prepare(&mut environment, &descriptors).await?;
if self.deployer.readiness_checks {
ReadinessChecker::wait_all(&descriptors, &host_ports, &mut environment).await?;
} else {
info!("readiness checks disabled; giving the stack a short grace period");
crate::readiness::maybe_sleep_for_disabled_readiness(false).await;
}
let host = compose_runner_host();
let client_builder = ClientBuilder::new();
let node_clients = client_builder
.build_node_clients(&descriptors, &host_ports, &host, &mut environment)
.await?;
let telemetry = metrics_handle_from_port(environment.prometheus_port(), &host)?;
let node_control = self.maybe_node_control::<Caps>(&environment);
let (block_feed, block_feed_guard) = client_builder
.start_block_feed(&node_clients, &mut environment)
.await?;
let cleanup_guard = make_cleanup_guard(environment.into_cleanup(), block_feed_guard);
let context = RunContext::new(
descriptors,
None,
node_clients,
scenario.duration(),
telemetry,
block_feed,
node_control,
);
Ok(Runner::new(context, Some(cleanup_guard)))
}
fn maybe_node_control<Caps>(
&self,
environment: &crate::environment::StackEnvironment,
) -> Option<Arc<dyn NodeControlHandle>>
where
Caps: RequiresNodeControl + Send + Sync,
{
Caps::REQUIRED.then(|| {
Arc::new(ComposeNodeControl {
compose_file: environment.compose_path().to_path_buf(),
project_name: environment.project_name().to_owned(),
}) as Arc<dyn NodeControlHandle>
})
}
}

View File

@ -0,0 +1,36 @@
use testing_framework_core::topology::generation::GeneratedTopology;
use tracing::{debug, info};
use crate::{
environment::StackEnvironment,
errors::ComposeRunnerError,
ports::{HostPortMapping, discover_host_ports},
};
pub struct PortManager;
impl PortManager {
pub async fn prepare(
environment: &mut StackEnvironment,
descriptors: &GeneratedTopology,
) -> Result<HostPortMapping, ComposeRunnerError> {
debug!("resolving host ports for compose services");
match discover_host_ports(environment, descriptors).await {
Ok(mapping) => {
info!(
validator_ports = ?mapping.validator_api_ports(),
executor_ports = ?mapping.executor_api_ports(),
prometheus_port = environment.prometheus_port(),
"resolved container host ports"
);
Ok(mapping)
}
Err(err) => {
environment
.fail("failed to determine container host ports")
.await;
Err(err)
}
}
}
}

View File

@ -0,0 +1,48 @@
use testing_framework_core::topology::generation::GeneratedTopology;
use tracing::info;
use crate::{
environment::StackEnvironment,
errors::ComposeRunnerError,
ports::{HostPortMapping, ensure_remote_readiness_with_ports},
readiness::{ensure_executors_ready_with_ports, ensure_validators_ready_with_ports},
};
pub struct ReadinessChecker;
impl ReadinessChecker {
pub async fn wait_all(
descriptors: &GeneratedTopology,
host_ports: &HostPortMapping,
environment: &mut StackEnvironment,
) -> Result<(), ComposeRunnerError> {
info!(
ports = ?host_ports.validator_api_ports(),
"waiting for validator HTTP endpoints"
);
if let Err(err) =
ensure_validators_ready_with_ports(&host_ports.validator_api_ports()).await
{
environment.fail("validator readiness failed").await;
return Err(err.into());
}
info!(
ports = ?host_ports.executor_api_ports(),
"waiting for executor HTTP endpoints"
);
if let Err(err) = ensure_executors_ready_with_ports(&host_ports.executor_api_ports()).await
{
environment.fail("executor readiness failed").await;
return Err(err.into());
}
info!("waiting for remote service readiness");
if let Err(err) = ensure_remote_readiness_with_ports(descriptors, host_ports).await {
environment.fail("remote readiness probe failed").await;
return Err(err.into());
}
Ok(())
}
}

View File

@ -0,0 +1,86 @@
use std::{
env,
net::{Ipv4Addr, TcpListener as StdTcpListener},
};
use testing_framework_core::topology::generation::GeneratedTopology;
use tracing::info;
use crate::{
docker::ensure_docker_available,
environment::{
PortReservation, StackEnvironment, ensure_supported_topology, prepare_environment,
},
errors::ComposeRunnerError,
};
pub const PROMETHEUS_PORT_ENV: &str = "TEST_FRAMEWORK_PROMETHEUS_PORT";
pub const DEFAULT_PROMETHEUS_PORT: u16 = 9090;
pub struct DeploymentSetup {
descriptors: GeneratedTopology,
}
pub struct DeploymentContext {
pub descriptors: GeneratedTopology,
pub environment: StackEnvironment,
}
impl DeploymentSetup {
pub fn new(descriptors: &GeneratedTopology) -> Self {
Self {
descriptors: descriptors.clone(),
}
}
pub async fn validate_environment(&self) -> Result<(), ComposeRunnerError> {
ensure_docker_available().await?;
ensure_supported_topology(&self.descriptors)?;
info!(
validators = self.descriptors.validators().len(),
executors = self.descriptors.executors().len(),
"starting compose deployment"
);
Ok(())
}
pub async fn prepare_workspace(self) -> Result<DeploymentContext, ComposeRunnerError> {
let prometheus_env = env::var(PROMETHEUS_PORT_ENV)
.ok()
.and_then(|raw| raw.parse::<u16>().ok());
if prometheus_env.is_some() {
info!(port = prometheus_env, "using prometheus port from env");
}
let prometheus_port = prometheus_env
.and_then(|port| reserve_port(port))
.or_else(|| allocate_prometheus_port())
.unwrap_or_else(|| PortReservation::new(DEFAULT_PROMETHEUS_PORT, None));
let environment =
prepare_environment(&self.descriptors, prometheus_port, prometheus_env.is_some())
.await?;
info!(
compose_file = %environment.compose_path().display(),
project = environment.project_name(),
root = %environment.root().display(),
"compose workspace prepared"
);
Ok(DeploymentContext {
descriptors: self.descriptors,
environment,
})
}
}
fn allocate_prometheus_port() -> Option<PortReservation> {
reserve_port(DEFAULT_PROMETHEUS_PORT).or_else(|| reserve_port(0))
}
fn reserve_port(port: u16) -> Option<PortReservation> {
let listener = StdTcpListener::bind((Ipv4Addr::LOCALHOST, port)).ok()?;
let actual_port = listener.local_addr().ok()?.port();
Some(PortReservation::new(actual_port, Some(listener)))
}

View File

@ -16,7 +16,7 @@ use crate::{
cfgsync::{CfgsyncServerHandle, update_cfgsync_config},
cleanup::RunnerCleanup,
commands::{compose_up, dump_compose_logs},
deployer::DEFAULT_PROMETHEUS_PORT,
deployer::setup::DEFAULT_PROMETHEUS_PORT,
descriptor::ComposeDescriptor,
docker::{ensure_compose_image, run_docker_command},
errors::{ComposeRunnerError, ConfigError, WorkspaceError},