mirror of
https://github.com/logos-blockchain/logos-blockchain-testing.git
synced 2026-01-02 13:23:13 +00:00
441 lines
13 KiB
Rust
441 lines
13 KiB
Rust
use std::{
|
|
net::{Ipv4Addr, TcpListener as StdTcpListener},
|
|
path::{Path, PathBuf},
|
|
time::Duration,
|
|
};
|
|
|
|
use anyhow::anyhow;
|
|
use testing_framework_core::{
|
|
adjust_timeout, scenario::CleanupGuard, topology::generation::GeneratedTopology,
|
|
};
|
|
use tokio::process::Command;
|
|
use tracing::{debug, info, warn};
|
|
use uuid::Uuid;
|
|
|
|
use crate::{
|
|
deployer::setup::DEFAULT_PROMETHEUS_PORT,
|
|
descriptor::ComposeDescriptor,
|
|
docker::{
|
|
commands::{compose_up, dump_compose_logs, run_docker_command},
|
|
ensure_compose_image,
|
|
platform::resolve_image,
|
|
workspace::ComposeWorkspace,
|
|
},
|
|
errors::{ComposeRunnerError, ConfigError, WorkspaceError},
|
|
infrastructure::{
|
|
cfgsync::{CfgsyncServerHandle, update_cfgsync_config},
|
|
template::write_compose_file,
|
|
},
|
|
lifecycle::cleanup::RunnerCleanup,
|
|
};
|
|
|
|
const CFGSYNC_START_TIMEOUT: Duration = Duration::from_secs(180);
|
|
const STACK_BRINGUP_MAX_ATTEMPTS: usize = 3;
|
|
|
|
/// Paths and flags describing the prepared compose workspace.
|
|
pub struct WorkspaceState {
|
|
pub workspace: ComposeWorkspace,
|
|
pub root: PathBuf,
|
|
pub cfgsync_path: PathBuf,
|
|
pub use_kzg: bool,
|
|
}
|
|
|
|
/// Holds paths and handles for a running docker-compose stack.
|
|
pub struct StackEnvironment {
|
|
compose_path: PathBuf,
|
|
project_name: String,
|
|
root: PathBuf,
|
|
workspace: Option<ComposeWorkspace>,
|
|
cfgsync_handle: Option<CfgsyncServerHandle>,
|
|
prometheus_port: u16,
|
|
}
|
|
|
|
impl StackEnvironment {
|
|
/// Builds an environment from the prepared workspace and compose artifacts.
|
|
pub fn from_workspace(
|
|
state: WorkspaceState,
|
|
compose_path: PathBuf,
|
|
project_name: String,
|
|
cfgsync_handle: Option<CfgsyncServerHandle>,
|
|
prometheus_port: u16,
|
|
) -> Self {
|
|
let WorkspaceState {
|
|
workspace, root, ..
|
|
} = state;
|
|
|
|
Self {
|
|
compose_path,
|
|
project_name,
|
|
root,
|
|
workspace: Some(workspace),
|
|
cfgsync_handle,
|
|
prometheus_port,
|
|
}
|
|
}
|
|
|
|
pub fn compose_path(&self) -> &Path {
|
|
&self.compose_path
|
|
}
|
|
|
|
/// Host port exposed by Prometheus.
|
|
pub const fn prometheus_port(&self) -> u16 {
|
|
self.prometheus_port
|
|
}
|
|
|
|
/// Docker compose project name.
|
|
pub fn project_name(&self) -> &str {
|
|
&self.project_name
|
|
}
|
|
|
|
/// Root directory that contains generated assets.
|
|
pub fn root(&self) -> &Path {
|
|
&self.root
|
|
}
|
|
|
|
/// Convert into a cleanup guard while keeping the environment borrowed.
|
|
pub fn take_cleanup(&mut self) -> RunnerCleanup {
|
|
RunnerCleanup::new(
|
|
self.compose_path.clone(),
|
|
self.project_name.clone(),
|
|
self.root.clone(),
|
|
self.workspace
|
|
.take()
|
|
.expect("workspace must be available while cleaning up"),
|
|
self.cfgsync_handle.take(),
|
|
)
|
|
}
|
|
|
|
/// Convert into a cleanup guard, consuming the environment.
|
|
pub fn into_cleanup(self) -> RunnerCleanup {
|
|
RunnerCleanup::new(
|
|
self.compose_path,
|
|
self.project_name,
|
|
self.root,
|
|
self.workspace
|
|
.expect("workspace must be available while cleaning up"),
|
|
self.cfgsync_handle,
|
|
)
|
|
}
|
|
|
|
/// Dump compose logs and trigger cleanup after a failure.
|
|
pub async fn fail(&mut self, reason: &str) {
|
|
use tracing::error;
|
|
|
|
error!(
|
|
reason = reason,
|
|
"compose stack failure; dumping docker logs"
|
|
);
|
|
dump_compose_logs(self.compose_path(), self.project_name(), self.root()).await;
|
|
Box::new(self.take_cleanup()).cleanup();
|
|
}
|
|
}
|
|
|
|
/// Represents a claimed port, optionally guarded by an open socket.
|
|
pub struct PortReservation {
|
|
port: u16,
|
|
_guard: Option<StdTcpListener>,
|
|
}
|
|
|
|
impl PortReservation {
|
|
/// Holds a port and an optional socket guard to keep it reserved.
|
|
pub const fn new(port: u16, guard: Option<StdTcpListener>) -> Self {
|
|
Self {
|
|
port,
|
|
_guard: guard,
|
|
}
|
|
}
|
|
|
|
/// The reserved port number.
|
|
pub const fn port(&self) -> u16 {
|
|
self.port
|
|
}
|
|
}
|
|
|
|
/// Verifies the topology has at least one validator so compose can start.
|
|
pub fn ensure_supported_topology(
|
|
descriptors: &GeneratedTopology,
|
|
) -> Result<(), ComposeRunnerError> {
|
|
let validators = descriptors.validators().len();
|
|
if validators == 0 {
|
|
return Err(ComposeRunnerError::MissingValidator {
|
|
validators,
|
|
executors: descriptors.executors().len(),
|
|
});
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
/// Create a temporary workspace with copied testnet assets and derived paths.
|
|
pub fn prepare_workspace_state() -> Result<WorkspaceState, WorkspaceError> {
|
|
let workspace = ComposeWorkspace::create().map_err(WorkspaceError::new)?;
|
|
let root = workspace.root_path().to_path_buf();
|
|
let cfgsync_path = workspace.stack_dir().join("cfgsync.yaml");
|
|
let use_kzg = workspace.root_path().join("kzgrs_test_params").exists();
|
|
|
|
Ok(WorkspaceState {
|
|
workspace,
|
|
root,
|
|
cfgsync_path,
|
|
use_kzg,
|
|
})
|
|
}
|
|
|
|
/// Log wrapper for `prepare_workspace_state`.
|
|
pub fn prepare_workspace_logged() -> Result<WorkspaceState, ComposeRunnerError> {
|
|
info!("preparing compose workspace");
|
|
prepare_workspace_state().map_err(Into::into)
|
|
}
|
|
|
|
/// Render cfgsync config based on the topology and chosen port, logging
|
|
/// progress.
|
|
pub fn update_cfgsync_logged(
|
|
workspace: &WorkspaceState,
|
|
descriptors: &GeneratedTopology,
|
|
cfgsync_port: u16,
|
|
) -> Result<(), ComposeRunnerError> {
|
|
info!(cfgsync_port, "updating cfgsync configuration");
|
|
configure_cfgsync(workspace, descriptors, cfgsync_port).map_err(Into::into)
|
|
}
|
|
|
|
/// Start the cfgsync server container using the generated config.
|
|
pub async fn start_cfgsync_stage(
|
|
workspace: &WorkspaceState,
|
|
cfgsync_port: u16,
|
|
) -> Result<CfgsyncServerHandle, ComposeRunnerError> {
|
|
info!(cfgsync_port = cfgsync_port, "launching cfgsync server");
|
|
let handle = launch_cfgsync(&workspace.cfgsync_path, cfgsync_port).await?;
|
|
Ok(handle)
|
|
}
|
|
|
|
/// Update cfgsync YAML on disk with topology-derived values.
|
|
pub fn configure_cfgsync(
|
|
workspace: &WorkspaceState,
|
|
descriptors: &GeneratedTopology,
|
|
cfgsync_port: u16,
|
|
) -> Result<(), ConfigError> {
|
|
update_cfgsync_config(
|
|
&workspace.cfgsync_path,
|
|
descriptors,
|
|
workspace.use_kzg,
|
|
cfgsync_port,
|
|
)
|
|
.map_err(|source| ConfigError::Cfgsync {
|
|
path: workspace.cfgsync_path.clone(),
|
|
source,
|
|
})
|
|
}
|
|
|
|
/// Bind an ephemeral port for cfgsync, returning the chosen value.
|
|
pub fn allocate_cfgsync_port() -> Result<u16, ConfigError> {
|
|
let listener =
|
|
StdTcpListener::bind((Ipv4Addr::UNSPECIFIED, 0)).map_err(|source| ConfigError::Port {
|
|
source: source.into(),
|
|
})?;
|
|
|
|
let port = listener
|
|
.local_addr()
|
|
.map_err(|source| ConfigError::Port {
|
|
source: source.into(),
|
|
})?
|
|
.port();
|
|
Ok(port)
|
|
}
|
|
|
|
/// Launch cfgsync in a detached docker container on the provided port.
|
|
pub async fn launch_cfgsync(
|
|
cfgsync_path: &Path,
|
|
port: u16,
|
|
) -> Result<CfgsyncServerHandle, ConfigError> {
|
|
let testnet_dir = cfgsync_path
|
|
.parent()
|
|
.ok_or_else(|| ConfigError::CfgsyncStart {
|
|
port,
|
|
source: anyhow!("cfgsync path {cfgsync_path:?} has no parent directory"),
|
|
})?;
|
|
let (image, _) = resolve_image();
|
|
let container_name = format!("nomos-cfgsync-{}", Uuid::new_v4());
|
|
|
|
let mut command = Command::new("docker");
|
|
command
|
|
.arg("run")
|
|
.arg("-d")
|
|
.arg("--name")
|
|
.arg(&container_name)
|
|
.arg("--entrypoint")
|
|
.arg("cfgsync-server")
|
|
.arg("-p")
|
|
.arg(format!("{port}:{port}"))
|
|
.arg("-v")
|
|
.arg(format!(
|
|
"{}:/etc/nomos:ro",
|
|
testnet_dir
|
|
.canonicalize()
|
|
.unwrap_or_else(|_| testnet_dir.to_path_buf())
|
|
.display()
|
|
))
|
|
.arg(&image)
|
|
.arg("/etc/nomos/cfgsync.yaml");
|
|
|
|
run_docker_command(
|
|
command,
|
|
adjust_timeout(CFGSYNC_START_TIMEOUT),
|
|
"docker run cfgsync server",
|
|
)
|
|
.await
|
|
.map_err(|source| ConfigError::CfgsyncStart {
|
|
port,
|
|
source: anyhow!(source),
|
|
})?;
|
|
|
|
Ok(CfgsyncServerHandle::Container {
|
|
name: container_name,
|
|
stopped: false,
|
|
})
|
|
}
|
|
|
|
/// Render compose file and associated assets for the current topology.
|
|
pub fn write_compose_artifacts(
|
|
workspace: &WorkspaceState,
|
|
descriptors: &GeneratedTopology,
|
|
cfgsync_port: u16,
|
|
prometheus_port: u16,
|
|
) -> Result<PathBuf, ConfigError> {
|
|
let descriptor = ComposeDescriptor::builder(descriptors)
|
|
.with_kzg_mount(workspace.use_kzg)
|
|
.with_cfgsync_port(cfgsync_port)
|
|
.with_prometheus_port(prometheus_port)
|
|
.build()
|
|
.map_err(|source| ConfigError::Descriptor { source })?;
|
|
|
|
let compose_path = workspace.root.join("compose.generated.yml");
|
|
write_compose_file(&descriptor, &compose_path)
|
|
.map_err(|source| ConfigError::Template { source })?;
|
|
Ok(compose_path)
|
|
}
|
|
|
|
/// Log and wrap `write_compose_artifacts` errors for the runner.
|
|
pub fn render_compose_logged(
|
|
workspace: &WorkspaceState,
|
|
descriptors: &GeneratedTopology,
|
|
cfgsync_port: u16,
|
|
prometheus_port: u16,
|
|
) -> Result<PathBuf, ComposeRunnerError> {
|
|
info!(
|
|
cfgsync_port,
|
|
prometheus_port, "rendering compose file with ports"
|
|
);
|
|
write_compose_artifacts(workspace, descriptors, cfgsync_port, prometheus_port)
|
|
.map_err(Into::into)
|
|
}
|
|
|
|
/// Bring up docker compose; shut down cfgsync if start-up fails.
|
|
pub async fn bring_up_stack(
|
|
compose_path: &Path,
|
|
project_name: &str,
|
|
workspace_root: &Path,
|
|
cfgsync_handle: &mut CfgsyncServerHandle,
|
|
) -> Result<(), ComposeRunnerError> {
|
|
if let Err(err) = compose_up(compose_path, project_name, workspace_root).await {
|
|
cfgsync_handle.shutdown();
|
|
return Err(ComposeRunnerError::Compose(err));
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
/// Log compose bring-up with context.
|
|
pub async fn bring_up_stack_logged(
|
|
compose_path: &Path,
|
|
project_name: &str,
|
|
workspace_root: &Path,
|
|
cfgsync_handle: &mut CfgsyncServerHandle,
|
|
) -> Result<(), ComposeRunnerError> {
|
|
info!(project = %project_name, "bringing up docker compose stack");
|
|
bring_up_stack(compose_path, project_name, workspace_root, cfgsync_handle).await
|
|
}
|
|
|
|
/// Prepare workspace, cfgsync, compose artifacts, and launch the stack.
|
|
pub async fn prepare_environment(
|
|
descriptors: &GeneratedTopology,
|
|
mut prometheus_port: PortReservation,
|
|
prometheus_port_locked: bool,
|
|
) -> Result<StackEnvironment, ComposeRunnerError> {
|
|
let workspace = prepare_workspace_logged()?;
|
|
let cfgsync_port = allocate_cfgsync_port()?;
|
|
update_cfgsync_logged(&workspace, descriptors, cfgsync_port)?;
|
|
ensure_compose_image().await?;
|
|
|
|
let attempts = if prometheus_port_locked {
|
|
1
|
|
} else {
|
|
STACK_BRINGUP_MAX_ATTEMPTS
|
|
};
|
|
let mut last_err = None;
|
|
|
|
for _ in 0..attempts {
|
|
let prometheus_port_value = prometheus_port.port();
|
|
let compose_path =
|
|
render_compose_logged(&workspace, descriptors, cfgsync_port, prometheus_port_value)?;
|
|
|
|
let project_name = format!("nomos-compose-{}", Uuid::new_v4());
|
|
let mut cfgsync_handle = start_cfgsync_stage(&workspace, cfgsync_port).await?;
|
|
|
|
drop(prometheus_port);
|
|
match bring_up_stack_logged(
|
|
&compose_path,
|
|
&project_name,
|
|
&workspace.root,
|
|
&mut cfgsync_handle,
|
|
)
|
|
.await
|
|
{
|
|
Ok(()) => {
|
|
info!(
|
|
project = %project_name,
|
|
compose_file = %compose_path.display(),
|
|
cfgsync_port,
|
|
prometheus_port = prometheus_port_value,
|
|
"compose stack is up"
|
|
);
|
|
return Ok(StackEnvironment::from_workspace(
|
|
workspace,
|
|
compose_path,
|
|
project_name,
|
|
Some(cfgsync_handle),
|
|
prometheus_port_value,
|
|
));
|
|
}
|
|
Err(err) => {
|
|
// Attempt to capture container logs even when bring-up fails early.
|
|
dump_compose_logs(&compose_path, &project_name, &workspace.root).await;
|
|
cfgsync_handle.shutdown();
|
|
last_err = Some(err);
|
|
if prometheus_port_locked {
|
|
break;
|
|
}
|
|
warn!(
|
|
error = %last_err.as_ref().unwrap(),
|
|
"compose bring-up failed; retrying with a new prometheus port"
|
|
);
|
|
prometheus_port = allocate_prometheus_port()
|
|
.unwrap_or_else(|| PortReservation::new(DEFAULT_PROMETHEUS_PORT, None));
|
|
debug!(
|
|
next_prometheus_port = prometheus_port.port(),
|
|
"retrying compose bring-up"
|
|
);
|
|
}
|
|
}
|
|
}
|
|
|
|
Err(last_err.expect("prepare_environment should return or fail with error"))
|
|
}
|
|
|
|
fn allocate_prometheus_port() -> Option<PortReservation> {
|
|
reserve_prometheus_port(DEFAULT_PROMETHEUS_PORT).or_else(|| reserve_prometheus_port(0))
|
|
}
|
|
|
|
fn reserve_prometheus_port(port: u16) -> Option<PortReservation> {
|
|
let listener = StdTcpListener::bind((Ipv4Addr::LOCALHOST, port)).ok()?;
|
|
let actual_port = listener.local_addr().ok()?.port();
|
|
Some(PortReservation::new(actual_port, Some(listener)))
|
|
}
|