From 74659780fe889467eda82e4ab6d6fe867a01085c Mon Sep 17 00:00:00 2001 From: andrussal Date: Thu, 29 Jan 2026 12:12:47 +0100 Subject: [PATCH] Compose manual cluster --- Cargo.lock | 1 + scripts/build/build-bundle.sh | 25 +- scripts/build/build_test_image.sh | 4 + .../assets/stack/Dockerfile.base | 5 + .../stack/scripts/docker/prepare_binaries.sh | 1 + .../core/src/topology/generation.rs | 3 +- .../src/topology/readiness/manual_network.rs} | 54 +++- .../core/src/topology/readiness/mod.rs | 4 + .../deployers/compose/Cargo.toml | 1 + .../compose/assets/docker-compose.yml.tera | 27 -- .../deployers/compose/src/deployer/mod.rs | 23 +- .../deployers/compose/src/descriptor/mod.rs | 4 +- .../deployers/compose/src/docker/commands.rs | 94 ++++-- .../compose/src/infrastructure/cfgsync.rs | 9 + .../compose/src/infrastructure/environment.rs | 166 +++++++++- .../compose/src/infrastructure/ports.rs | 27 +- .../deployers/compose/src/lib.rs | 2 + .../compose/src/lifecycle/cleanup.rs | 5 + .../deployers/compose/src/manual/mod.rs | 299 ++++++++++++++++++ .../deployers/compose/src/manual/network.rs | 29 ++ .../deployers/compose/src/manual/readiness.rs | 39 +++ .../deployers/compose/src/manual/state.rs | 83 +++++ .../deployers/compose/tests/manual_cluster.rs | 66 ++++ .../deployers/local/src/manual/mod.rs | 8 +- .../deployers/local/src/node_control/mod.rs | 57 ++-- 25 files changed, 907 insertions(+), 129 deletions(-) rename testing-framework/{deployers/local/src/manual/readiness.rs => core/src/topology/readiness/manual_network.rs} (63%) create mode 100644 testing-framework/deployers/compose/src/manual/mod.rs create mode 100644 testing-framework/deployers/compose/src/manual/network.rs create mode 100644 testing-framework/deployers/compose/src/manual/readiness.rs create mode 100644 testing-framework/deployers/compose/src/manual/state.rs create mode 100644 testing-framework/deployers/compose/tests/manual_cluster.rs diff --git a/Cargo.lock b/Cargo.lock index c239e76..d37beff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6513,6 +6513,7 @@ dependencies = [ "logos-blockchain-groth16", "logos-blockchain-key-management-system-service", "logos-blockchain-ledger", + "logos-blockchain-network-service", "logos-blockchain-tracing", "logos-blockchain-tracing-service", "logos-blockchain-zksign", diff --git a/scripts/build/build-bundle.sh b/scripts/build/build-bundle.sh index 4a32ede..ad61cac 100755 --- a/scripts/build/build-bundle.sh +++ b/scripts/build/build-bundle.sh @@ -132,6 +132,23 @@ build_bundle::default_docker_platform() { esac } +build_bundle::ensure_circuits() { + if [ -n "${LOGOS_BLOCKCHAIN_CIRCUITS:-}" ]; then + [ -d "${LOGOS_BLOCKCHAIN_CIRCUITS}" ] || build_bundle::fail \ + "LOGOS_BLOCKCHAIN_CIRCUITS is set but missing: ${LOGOS_BLOCKCHAIN_CIRCUITS}" + return 0 + fi + + local default_dir="${HOME}/.logos-blockchain-circuits" + if [ ! -d "${default_dir}" ]; then + echo "==> Circuits not found; installing to ${default_dir}" + bash "${ROOT_DIR}/scripts/setup/setup-logos-blockchain-circuits.sh" "${VERSION}" "${default_dir}" + fi + + LOGOS_BLOCKCHAIN_CIRCUITS="${default_dir}" + export LOGOS_BLOCKCHAIN_CIRCUITS +} + build_bundle::parse_args() { PLATFORM="host" OUTPUT="" @@ -239,6 +256,9 @@ build_bundle::maybe_run_linux_build_in_docker() { ;; esac fi + if [ -n "${LOGOS_BLOCKCHAIN_CIRCUITS:-}" ] && [ -d "${LOGOS_BLOCKCHAIN_CIRCUITS}" ]; then + extra_mounts+=("-v" "${LOGOS_BLOCKCHAIN_CIRCUITS}:/root/.logos-blockchain-circuits:ro") + fi echo "==> Building Linux bundle inside Docker" local container_output="/workspace${OUTPUT#"${ROOT_DIR}"}" @@ -263,6 +283,7 @@ build_bundle::maybe_run_linux_build_in_docker() { -e VERSION="${VERSION}" \ -e LOGOS_BLOCKCHAIN_NODE_REV="${LOGOS_BLOCKCHAIN_NODE_REV}" \ -e LOGOS_BLOCKCHAIN_NODE_PATH="${node_path_env}" \ + -e LOGOS_BLOCKCHAIN_CIRCUITS="/root/.logos-blockchain-circuits" \ -e LOGOS_BLOCKCHAIN_BUNDLE_DOCKER_PLATFORM="${DOCKER_PLATFORM}" \ -e LOGOS_BLOCKCHAIN_EXTRA_FEATURES="${LOGOS_BLOCKCHAIN_EXTRA_FEATURES:-}" \ -e BUNDLE_IN_CONTAINER=1 \ @@ -329,14 +350,15 @@ build_bundle::build_binaries() { if [ -z "${LOGOS_BLOCKCHAIN_NODE_PATH}" ]; then build_bundle::apply_nomos_node_patches "${NODE_SRC}" fi - unset CARGO_FEATURE_BUILD_VERIFICATION_KEY if [ -n "${BUNDLE_RUSTUP_TOOLCHAIN}" ]; then + CARGO_FEATURE_BUILD_VERIFICATION_KEY=1 \ RUSTFLAGS='--cfg feature="pol-dev-mode"' \ RUSTUP_TOOLCHAIN="${BUNDLE_RUSTUP_TOOLCHAIN}" \ cargo build --all-features \ -p logos-blockchain-node \ --target-dir "${NODE_TARGET}" else + CARGO_FEATURE_BUILD_VERIFICATION_KEY=1 \ RUSTFLAGS='--cfg feature="pol-dev-mode"' \ cargo build --all-features \ -p logos-blockchain-node \ @@ -385,6 +407,7 @@ build_bundle::main() { build_bundle::clean_cargo_linux_cache build_bundle::parse_args "$@" build_bundle::validate_and_finalize + build_bundle::ensure_circuits build_bundle::maybe_run_linux_build_in_docker build_bundle::prepare_circuits build_bundle::build_binaries diff --git a/scripts/build/build_test_image.sh b/scripts/build/build_test_image.sh index 4e07960..1a7a9f0 100755 --- a/scripts/build/build_test_image.sh +++ b/scripts/build/build_test_image.sh @@ -164,6 +164,10 @@ build_test_image::docker_build() { linux-aarch64) target_platform="linux/arm64" ;; esac fi + if [ -z "${target_platform}" ] && [ "${host_platform}" = "linux/arm64" ]; then + # Default to amd64 so circuits download matches available release artifacts. + target_platform="linux/amd64" + fi local -a base_build_args=( -f "${BASE_DOCKERFILE_PATH}" diff --git a/testing-framework/assets/stack/Dockerfile.base b/testing-framework/assets/stack/Dockerfile.base index aeceff6..635beda 100644 --- a/testing-framework/assets/stack/Dockerfile.base +++ b/testing-framework/assets/stack/Dockerfile.base @@ -40,6 +40,8 @@ RUN /workspace/testing-framework/assets/stack/scripts/setup-logos-blockchain-cir ENV LOGOS_BLOCKCHAIN_CIRCUITS=/opt/circuits +RUN /workspace/scripts/build/build-rapidsnark.sh /opt/circuits + RUN /workspace/testing-framework/assets/stack/scripts/docker/prepare_binaries.sh # Strip local path patches so container builds use git sources. @@ -68,5 +70,8 @@ RUN apt-get update && apt-get install -yq \ COPY --from=builder /workspace/artifacts/logos-blockchain-node /usr/bin/logos-blockchain-node COPY --from=builder /workspace/artifacts/cfgsync-server /usr/bin/cfgsync-server COPY --from=builder /workspace/artifacts/cfgsync-client /usr/bin/cfgsync-client +COPY --from=builder /opt/circuits /opt/circuits + +ENV LOGOS_BLOCKCHAIN_CIRCUITS=/opt/circuits EXPOSE 3000 8080 9000 60000 diff --git a/testing-framework/assets/stack/scripts/docker/prepare_binaries.sh b/testing-framework/assets/stack/scripts/docker/prepare_binaries.sh index 90a0b8c..79bd0ba 100755 --- a/testing-framework/assets/stack/scripts/docker/prepare_binaries.sh +++ b/testing-framework/assets/stack/scripts/docker/prepare_binaries.sh @@ -51,6 +51,7 @@ git reset --hard git clean -fdx # Enable pol-dev-mode via cfg to let POL_PROOF_DEV_MODE short-circuit proofs in tests. +# Build with default features disabled for proof crates (dummy verification keys). RUSTFLAGS='--cfg feature="pol-dev-mode"' \ cargo build --features "testing" -p logos-blockchain-node diff --git a/testing-framework/core/src/topology/generation.rs b/testing-framework/core/src/topology/generation.rs index 270e4f3..17e9ae3 100644 --- a/testing-framework/core/src/topology/generation.rs +++ b/testing-framework/core/src/topology/generation.rs @@ -170,8 +170,7 @@ async fn wait_for_network_readiness( let listen_ports = topology.listen_ports(); let initial_peer_ports = topology.initial_peer_ports(); - let expected_peer_counts = - crate::topology::generation::find_expected_peer_counts(&listen_ports, &initial_peer_ports); + let expected_peer_counts = find_expected_peer_counts(&listen_ports, &initial_peer_ports); let network_check = HttpNetworkReadiness { client, diff --git a/testing-framework/deployers/local/src/manual/readiness.rs b/testing-framework/core/src/topology/readiness/manual_network.rs similarity index 63% rename from testing-framework/deployers/local/src/manual/readiness.rs rename to testing-framework/core/src/topology/readiness/manual_network.rs index 327a2af..3843811 100644 --- a/testing-framework/deployers/local/src/manual/readiness.rs +++ b/testing-framework/core/src/topology/readiness/manual_network.rs @@ -1,23 +1,61 @@ -use std::time::Duration; +use std::{collections::HashSet, time::Duration}; use nomos_network::backends::libp2p::Libp2pInfo; -use testing_framework_core::topology::readiness::ReadinessCheck; use tokio::time::timeout; -use crate::node_control::ReadinessNode; +use super::ReadinessCheck; +use crate::{nodes::ApiClient, topology::generation::find_expected_peer_counts}; const NETWORK_REQUEST_TIMEOUT: Duration = Duration::from_secs(10); -pub(super) struct ManualNetworkReadiness { +#[derive(Clone)] +pub struct ReadinessNode { + pub label: String, + pub expected_peers: Option, + pub api: ApiClient, +} + +pub struct ManualNetworkReadiness { nodes: Vec, } impl ManualNetworkReadiness { - pub(super) fn new(nodes: Vec) -> Self { + pub fn new(nodes: Vec) -> Self { Self { nodes } } } +pub struct ManualNetworkStatus { + label: String, + expected_peers: Option, + result: Result, +} + +pub fn build_readiness_nodes(iter: I) -> Vec +where + I: IntoIterator)>, +{ + let entries = iter.into_iter().collect::>(); + let listen_ports = entries.iter().map(|entry| entry.2).collect::>(); + + let initial_peer_ports = entries + .iter() + .map(|entry| entry.3.clone()) + .collect::>(); + + let expected_peer_counts = find_expected_peer_counts(&listen_ports, &initial_peer_ports); + + entries + .into_iter() + .enumerate() + .map(|(idx, (label, api, _, _))| ReadinessNode { + label, + expected_peers: expected_peer_counts.get(idx).copied(), + api, + }) + .collect() +} + #[async_trait::async_trait] impl<'a> ReadinessCheck<'a> for ManualNetworkReadiness { type Data = Vec; @@ -66,9 +104,3 @@ impl<'a> ReadinessCheck<'a> for ManualNetworkReadiness { format!("timed out waiting for network readiness: {summary}") } } - -pub(super) struct ManualNetworkStatus { - label: String, - expected_peers: Option, - result: Result, -} diff --git a/testing-framework/core/src/topology/readiness/mod.rs b/testing-framework/core/src/topology/readiness/mod.rs index 7d2e8e1..79ec4ec 100644 --- a/testing-framework/core/src/topology/readiness/mod.rs +++ b/testing-framework/core/src/topology/readiness/mod.rs @@ -1,7 +1,11 @@ +pub mod manual_network; pub mod network; use std::time::Duration; +pub use manual_network::{ + ManualNetworkReadiness, ManualNetworkStatus, ReadinessNode, build_readiness_nodes, +}; pub use network::{HttpNetworkReadiness, NetworkReadiness}; use thiserror::Error; use tokio::time::{sleep, timeout}; diff --git a/testing-framework/deployers/compose/Cargo.toml b/testing-framework/deployers/compose/Cargo.toml index f9e787f..6c18695 100644 --- a/testing-framework/deployers/compose/Cargo.toml +++ b/testing-framework/deployers/compose/Cargo.toml @@ -16,6 +16,7 @@ workspace = true anyhow = "1" async-trait = { workspace = true } cfgsync_tf = { workspace = true } +nomos-network = { workspace = true } nomos-tracing = { workspace = true } nomos-tracing-service = { workspace = true } reqwest = { features = ["json"], workspace = true } diff --git a/testing-framework/deployers/compose/assets/docker-compose.yml.tera b/testing-framework/deployers/compose/assets/docker-compose.yml.tera index 13bedfd..ba21922 100644 --- a/testing-framework/deployers/compose/assets/docker-compose.yml.tera +++ b/testing-framework/deployers/compose/assets/docker-compose.yml.tera @@ -29,31 +29,4 @@ services: - seccomp=unconfined restart: on-failure -{% if node.platform %} platform: {{ node.platform }} -{% endif %} entrypoint: {{ node.entrypoint }} - volumes: -{% for volume in node.volumes %} - - {{ volume }} -{% endfor %} -{% if node.extra_hosts | length > 0 %} - extra_hosts: -{% for host in node.extra_hosts %} - - {{ host }} -{% endfor %} -{% endif %} - ports: -{% for port in node.ports %} - - {{ port }} -{% endfor %} - environment: -{% for env in node.environment %} - {{ env.key }}: "{{ env.value }}" -{% endfor %} - cap_add: - - SYS_ADMIN - - SYS_PTRACE - security_opt: - - seccomp=unconfined - restart: on-failure - {% endfor %} diff --git a/testing-framework/deployers/compose/src/deployer/mod.rs b/testing-framework/deployers/compose/src/deployer/mod.rs index df822f8..ae068fe 100644 --- a/testing-framework/deployers/compose/src/deployer/mod.rs +++ b/testing-framework/deployers/compose/src/deployer/mod.rs @@ -5,12 +5,19 @@ pub mod readiness; pub mod setup; use async_trait::async_trait; -use testing_framework_core::scenario::{ - BlockFeedTask, CleanupGuard, Deployer, ObservabilityCapabilityProvider, RequiresNodeControl, - Runner, Scenario, +use testing_framework_core::{ + scenario::{ + BlockFeedTask, CleanupGuard, Deployer, ObservabilityCapabilityProvider, + RequiresNodeControl, Runner, Scenario, + }, + topology::config::TopologyConfig, }; -use crate::{errors::ComposeRunnerError, lifecycle::cleanup::RunnerCleanup}; +use crate::{ + errors::ComposeRunnerError, + lifecycle::cleanup::RunnerCleanup, + manual::{ComposeManualCluster, ManualClusterError}, +}; /// Docker Compose-based deployer for Logos test scenarios. #[derive(Clone, Copy)] @@ -37,6 +44,14 @@ impl ComposeDeployer { self.readiness_checks = enabled; self } + + /// Build a manual cluster using this deployer's compose implementation. + pub async fn manual_cluster( + &self, + config: TopologyConfig, + ) -> Result { + ComposeManualCluster::from_config(config).await + } } #[async_trait] diff --git a/testing-framework/deployers/compose/src/descriptor/mod.rs b/testing-framework/deployers/compose/src/descriptor/mod.rs index 305e5f7..3f1efe2 100644 --- a/testing-framework/deployers/compose/src/descriptor/mod.rs +++ b/testing-framework/deployers/compose/src/descriptor/mod.rs @@ -123,6 +123,8 @@ fn base_environment(cfgsync_port: u16) -> Vec { let rust_log = tf_env::rust_log().unwrap_or_else(|| "info".to_string()); let nomos_log_level = tf_env::nomos_log_level().unwrap_or_else(|| "info".to_string()); let time_backend = tf_env::nomos_time_backend().unwrap_or_else(|| "monotonic".into()); + let cfgsync_host = + env::var("LOGOS_BLOCKCHAIN_CFGSYNC_HOST").unwrap_or_else(|_| "cfgsync".to_string()); vec![ EnvEntry::new("POL_PROOF_DEV_MODE", pol_mode), EnvEntry::new("RUST_LOG", rust_log), @@ -130,7 +132,7 @@ fn base_environment(cfgsync_port: u16) -> Vec { EnvEntry::new("LOGOS_BLOCKCHAIN_TIME_BACKEND", time_backend), EnvEntry::new( "CFG_SERVER_ADDR", - format!("http://host.docker.internal:{cfgsync_port}"), + format!("http://{cfgsync_host}:{cfgsync_port}"), ), EnvEntry::new("OTEL_METRIC_EXPORT_INTERVAL", "5000"), ] diff --git a/testing-framework/deployers/compose/src/docker/commands.rs b/testing-framework/deployers/compose/src/docker/commands.rs index 9386754..dbeea86 100644 --- a/testing-framework/deployers/compose/src/docker/commands.rs +++ b/testing-framework/deployers/compose/src/docker/commands.rs @@ -47,15 +47,8 @@ pub async fn compose_up( project_name: &str, root: &Path, ) -> Result<(), ComposeCommandError> { - let mut cmd = Command::new("docker"); - cmd.arg("compose") - .arg("-f") - .arg(compose_path) - .arg("-p") - .arg(project_name) - .arg("up") - .arg("-d") - .current_dir(root); + let mut cmd = compose_command(compose_path, project_name, root); + cmd.arg("up").arg("-d"); info!( compose_file = %compose_path.display(), @@ -67,21 +60,64 @@ pub async fn compose_up( run_compose_command(cmd, adjust_timeout(COMPOSE_UP_TIMEOUT), "docker compose up").await } +/// Runs `docker compose up --no-start` for the generated stack. +pub async fn compose_create( + compose_path: &Path, + project_name: &str, + root: &Path, +) -> Result<(), ComposeCommandError> { + let mut cmd = compose_command(compose_path, project_name, root); + cmd.arg("up").arg("--no-start"); + + info!( + compose_file = %compose_path.display(), + project = project_name, + root = %root.display(), + "running docker compose create" + ); + + run_compose_command( + cmd, + adjust_timeout(COMPOSE_UP_TIMEOUT), + "docker compose create", + ) + .await +} + +/// Runs `docker compose up -d --no-deps ` for a single service. +pub async fn compose_up_service( + compose_path: &Path, + project_name: &str, + root: &Path, + service: &str, +) -> Result<(), ComposeCommandError> { + let mut cmd = compose_command(compose_path, project_name, root); + cmd.arg("up").arg("-d").arg("--no-deps").arg(service); + + info!( + compose_file = %compose_path.display(), + project = project_name, + root = %root.display(), + service, + "running docker compose up for service" + ); + + run_compose_command( + cmd, + adjust_timeout(COMPOSE_UP_TIMEOUT), + "docker compose up service", + ) + .await +} + /// Runs `docker compose down --volumes` for the generated stack. pub async fn compose_down( compose_path: &Path, project_name: &str, root: &Path, ) -> Result<(), ComposeCommandError> { - let mut cmd = Command::new("docker"); - cmd.arg("compose") - .arg("-f") - .arg(compose_path) - .arg("-p") - .arg(project_name) - .arg("down") - .arg("--volumes") - .current_dir(root); + let mut cmd = compose_command(compose_path, project_name, root); + cmd.arg("down").arg("--volumes"); info!( compose_file = %compose_path.display(), @@ -100,15 +136,8 @@ pub async fn compose_down( /// Dump docker compose logs to stderr for debugging failures. pub async fn dump_compose_logs(compose_file: &Path, project: &str, root: &Path) { - let mut cmd = Command::new("docker"); - cmd.arg("compose") - .arg("-f") - .arg(compose_file) - .arg("-p") - .arg(project) - .arg("logs") - .arg("--no-color") - .current_dir(root); + let mut cmd = compose_command(compose_file, project, root); + cmd.arg("logs").arg("--no-color"); match cmd.output().await { Ok(output) => print_logs(&output.stdout, &output.stderr), @@ -146,6 +175,17 @@ async fn run_compose_command( } } +fn compose_command(compose_path: &Path, project_name: &str, root: &Path) -> Command { + let mut cmd = Command::new("docker"); + cmd.arg("compose") + .arg("-f") + .arg(compose_path) + .arg("-p") + .arg(project_name) + .current_dir(root); + cmd +} + fn handle_compose_status( status: std::io::Result, description: &str, diff --git a/testing-framework/deployers/compose/src/infrastructure/cfgsync.rs b/testing-framework/deployers/compose/src/infrastructure/cfgsync.rs index a4b05ea..65e76ce 100644 --- a/testing-framework/deployers/compose/src/infrastructure/cfgsync.rs +++ b/testing-framework/deployers/compose/src/infrastructure/cfgsync.rs @@ -27,6 +27,15 @@ impl CfgsyncServerHandle { _ => {} } } + + /// Prevent automatic shutdown for preserved runs. + pub fn mark_preserved(&mut self) { + match self { + Self::Container { stopped, .. } => { + *stopped = true; + } + } + } } fn remove_container(name: &str) { diff --git a/testing-framework/deployers/compose/src/infrastructure/environment.rs b/testing-framework/deployers/compose/src/infrastructure/environment.rs index fbf491a..7eb16d2 100644 --- a/testing-framework/deployers/compose/src/infrastructure/environment.rs +++ b/testing-framework/deployers/compose/src/infrastructure/environment.rs @@ -9,14 +9,18 @@ use reqwest::Url; use testing_framework_core::{ adjust_timeout, scenario::CleanupGuard, topology::generation::GeneratedTopology, }; -use tokio::process::Command; -use tracing::{debug, error, info}; +use tokio::{ + net::TcpStream, + process::Command, + time::{Instant, sleep}, +}; +use tracing::{debug, error, info, warn}; use uuid::Uuid; use crate::{ descriptor::ComposeDescriptor, docker::{ - commands::{compose_up, dump_compose_logs, run_docker_command}, + commands::{compose_create, compose_up, dump_compose_logs, run_docker_command}, ensure_compose_image, platform::resolve_image, workspace::ComposeWorkspace, @@ -30,6 +34,8 @@ use crate::{ }; const CFGSYNC_START_TIMEOUT: Duration = Duration::from_secs(180); +const CFGSYNC_READY_TIMEOUT: Duration = Duration::from_secs(60); +const CFGSYNC_READY_POLL: Duration = Duration::from_secs(2); /// Paths and flags describing the prepared compose workspace. pub struct WorkspaceState { @@ -136,9 +142,11 @@ pub fn ensure_supported_topology( descriptors: &GeneratedTopology, ) -> Result<(), ComposeRunnerError> { let nodes = descriptors.nodes().len(); + if nodes == 0 { return Err(ComposeRunnerError::MissingNode { nodes }); } + Ok(()) } @@ -192,10 +200,17 @@ pub fn update_cfgsync_logged( pub async fn start_cfgsync_stage( workspace: &WorkspaceState, cfgsync_port: u16, + project_name: &str, ) -> Result { info!(cfgsync_port = cfgsync_port, "launching cfgsync server"); - let handle = launch_cfgsync(&workspace.cfgsync_path, cfgsync_port).await?; + + let network = compose_network_name(project_name); + let handle = launch_cfgsync(&workspace.cfgsync_path, cfgsync_port, &network).await?; + + wait_for_cfgsync_ready(cfgsync_port, Some(&handle)).await?; + debug!(container = ?handle, "cfgsync server launched"); + Ok(handle) } @@ -231,7 +246,9 @@ pub fn allocate_cfgsync_port() -> Result { source: source.into(), })? .port(); + debug!(port, "allocated cfgsync port"); + Ok(port) } @@ -239,6 +256,7 @@ pub fn allocate_cfgsync_port() -> Result { pub async fn launch_cfgsync( cfgsync_path: &Path, port: u16, + network: &str, ) -> Result { let testnet_dir = cfgsync_path .parent() @@ -246,8 +264,10 @@ pub async fn launch_cfgsync( port, source: anyhow!("cfgsync path {cfgsync_path:?} has no parent directory"), })?; + let (image, _) = resolve_image(); let container_name = format!("nomos-cfgsync-{}", Uuid::new_v4()); + debug!( container = %container_name, image, @@ -262,6 +282,10 @@ pub async fn launch_cfgsync( .arg("-d") .arg("--name") .arg(&container_name) + .arg("--network") + .arg(network) + .arg("--network-alias") + .arg("cfgsync") .arg("--entrypoint") .arg("cfgsync-server") .arg("-p") @@ -273,9 +297,31 @@ pub async fn launch_cfgsync( .canonicalize() .unwrap_or_else(|_| testnet_dir.to_path_buf()) .display() - )) - .arg(&image) - .arg("/etc/nomos/cfgsync.yaml"); + )); + let circuits_dir = std::env::var("LOGOS_BLOCKCHAIN_CIRCUITS_DOCKER") + .ok() + .or_else(|| std::env::var("LOGOS_BLOCKCHAIN_CIRCUITS").ok()); + if let Some(circuits_dir) = circuits_dir { + let host_path = PathBuf::from(&circuits_dir); + if host_path.exists() { + command + .arg("-e") + .arg(format!("LOGOS_BLOCKCHAIN_CIRCUITS={circuits_dir}")) + .arg("-v") + .arg(format!( + "{}:{circuits_dir}:ro", + host_path.canonicalize().unwrap_or(host_path).display() + )); + } + } + + if let Ok(pol_mode) = std::env::var("POL_PROOF_DEV_MODE") { + command + .arg("-e") + .arg(format!("POL_PROOF_DEV_MODE={pol_mode}")); + } + + command.arg(&image).arg("/etc/nomos/cfgsync.yaml"); run_docker_command( command, @@ -372,7 +418,8 @@ pub async fn prepare_environment( let compose_path = render_compose_logged(&workspace, descriptors, cfgsync_port)?; let project_name = format!("nomos-compose-{}", Uuid::new_v4()); - let mut cfgsync_handle = start_cfgsync_stage(&workspace, cfgsync_port).await?; + compose_create(&compose_path, &project_name, &workspace.root).await?; + let mut cfgsync_handle = start_cfgsync_stage(&workspace, cfgsync_port, &project_name).await?; if let Err(err) = bring_up_stack_logged( &compose_path, @@ -401,3 +448,106 @@ pub async fn prepare_environment( Some(cfgsync_handle), )) } + +/// Prepare workspace, cfgsync, and compose artifacts without starting services. +pub async fn prepare_environment_manual( + descriptors: &GeneratedTopology, + metrics_otlp_ingest_url: Option<&Url>, +) -> Result { + let workspace = prepare_workspace_logged()?; + let cfgsync_port = allocate_cfgsync_port()?; + + update_cfgsync_logged( + &workspace, + descriptors, + cfgsync_port, + metrics_otlp_ingest_url, + )?; + ensure_compose_image().await?; + let compose_path = render_compose_logged(&workspace, descriptors, cfgsync_port)?; + + let project_name = format!("nomos-compose-{}", Uuid::new_v4()); + compose_create(&compose_path, &project_name, &workspace.root).await?; + let cfgsync_handle = start_cfgsync_stage(&workspace, cfgsync_port, &project_name).await?; + + info!( + project = %project_name, + compose_file = %compose_path.display(), + cfgsync_port, + "compose manual environment prepared" + ); + + Ok(StackEnvironment::from_workspace( + workspace, + compose_path, + project_name, + Some(cfgsync_handle), + )) +} + +async fn wait_for_cfgsync_ready( + port: u16, + handle: Option<&CfgsyncServerHandle>, +) -> Result<(), ComposeRunnerError> { + let deadline = Instant::now() + CFGSYNC_READY_TIMEOUT; + let addr = format!("127.0.0.1:{port}"); + + loop { + match TcpStream::connect(&addr).await { + Ok(_) => { + info!(port, "cfgsync server is reachable"); + return Ok(()); + } + Err(err) => { + if Instant::now() >= deadline { + dump_cfgsync_logs(handle).await; + return Err(ComposeRunnerError::Config(ConfigError::CfgsyncStart { + port, + source: anyhow!("cfgsync not reachable: {err}"), + })); + } + } + } + sleep(CFGSYNC_READY_POLL).await; + } +} + +async fn dump_cfgsync_logs(handle: Option<&CfgsyncServerHandle>) { + let Some(name) = handle.and_then(cfgsync_container_name) else { + return; + }; + + let mut cmd = Command::new("docker"); + cmd.arg("logs").arg(name); + + match cmd.output().await { + Ok(output) => { + if !output.stdout.is_empty() { + warn!( + logs = %String::from_utf8_lossy(&output.stdout), + container = name, + "cfgsync stdout" + ); + } + + if !output.stderr.is_empty() { + warn!( + logs = %String::from_utf8_lossy(&output.stderr), + container = name, + "cfgsync stderr" + ); + } + } + Err(err) => warn!(error = ?err, container = name, "failed to collect cfgsync logs"), + } +} + +fn cfgsync_container_name(handle: &CfgsyncServerHandle) -> Option<&str> { + match handle { + CfgsyncServerHandle::Container { name, .. } => Some(name.as_str()), + } +} + +fn compose_network_name(project_name: &str) -> String { + format!("{project_name}_default") +} diff --git a/testing-framework/deployers/compose/src/infrastructure/ports.rs b/testing-framework/deployers/compose/src/infrastructure/ports.rs index 03db06c..977b16a 100644 --- a/testing-framework/deployers/compose/src/infrastructure/ports.rs +++ b/testing-framework/deployers/compose/src/infrastructure/ports.rs @@ -1,4 +1,4 @@ -use std::time::Duration; +use std::{path::Path, time::Duration}; use anyhow::{Context as _, anyhow}; use reqwest::Url; @@ -69,17 +69,34 @@ async fn resolve_service_port( environment: &StackEnvironment, service: &str, container_port: u16, +) -> Result { + resolve_service_port_with( + environment.compose_path(), + environment.project_name(), + environment.root(), + service, + container_port, + ) + .await +} + +pub(crate) async fn resolve_service_port_with( + compose_path: &Path, + project_name: &str, + root: &Path, + service: &str, + container_port: u16, ) -> Result { let mut cmd = Command::new("docker"); cmd.arg("compose") .arg("-f") - .arg(environment.compose_path()) + .arg(compose_path) .arg("-p") - .arg(environment.project_name()) + .arg(project_name) .arg("port") .arg(service) .arg(container_port.to_string()) - .current_dir(environment.root()); + .current_dir(root); let output = timeout(adjust_timeout(COMPOSE_PORT_DISCOVERY_TIMEOUT), cmd.output()) .await @@ -148,7 +165,7 @@ fn localhost_url(port: u16) -> Result { Url::parse(&format!("http://{}:{port}/", compose_runner_host())) } -fn node_identifier(index: usize) -> String { +pub(crate) fn node_identifier(index: usize) -> String { format!("node-{index}") } diff --git a/testing-framework/deployers/compose/src/lib.rs b/testing-framework/deployers/compose/src/lib.rs index aabf08d..b954017 100644 --- a/testing-framework/deployers/compose/src/lib.rs +++ b/testing-framework/deployers/compose/src/lib.rs @@ -4,6 +4,7 @@ pub mod docker; pub mod errors; pub mod infrastructure; pub mod lifecycle; +pub mod manual; pub use deployer::ComposeDeployer; pub use descriptor::{ComposeDescriptor, ComposeDescriptorBuilder, EnvEntry, NodeDescriptor}; @@ -16,3 +17,4 @@ pub use infrastructure::{ ports::{HostPortMapping, NodeHostPorts}, template::{TemplateError, repository_root, write_compose_file}, }; +pub use manual::{ComposeManualCluster, ManualClusterError}; diff --git a/testing-framework/deployers/compose/src/lifecycle/cleanup.rs b/testing-framework/deployers/compose/src/lifecycle/cleanup.rs index dfc8bfc..e0fcd97 100644 --- a/testing-framework/deployers/compose/src/lifecycle/cleanup.rs +++ b/testing-framework/deployers/compose/src/lifecycle/cleanup.rs @@ -110,6 +110,11 @@ impl RunnerCleanup { info!(path = %keep.display(), "preserving docker state"); } + if let Some(mut cfgsync) = self.cfgsync.take() { + cfgsync.mark_preserved(); + self.cfgsync = Some(cfgsync); + } + info!("compose preserve flag set; skipping docker compose down"); } diff --git a/testing-framework/deployers/compose/src/manual/mod.rs b/testing-framework/deployers/compose/src/manual/mod.rs new file mode 100644 index 0000000..acb0378 --- /dev/null +++ b/testing-framework/deployers/compose/src/manual/mod.rs @@ -0,0 +1,299 @@ +use std::{path, sync::Mutex}; + +use async_trait::async_trait; +use path::{Path, PathBuf}; +use testing_framework_core::{ + manual::ManualClusterHandle, + nodes::ApiClient, + scenario::{CleanupGuard, DynError, PeerSelection, StartNodeOptions, StartedNode}, + topology::{ + config::{TopologyBuildError, TopologyBuilder, TopologyConfig}, + generation::GeneratedTopology, + readiness::{ManualNetworkReadiness, ReadinessCheck, ReadinessError, ReadinessNode}, + }, +}; +use thiserror::Error; + +use crate::{ + docker::{commands::compose_up_service, ensure_docker_available}, + errors::ComposeRunnerError, + infrastructure::{ + environment::{StackEnvironment, ensure_supported_topology, prepare_environment_manual}, + ports::{NodeHostPorts, compose_runner_host, node_identifier, resolve_service_port_with}, + }, +}; + +mod network; +mod readiness; +mod state; + +use network::api_client_from_host_ports; +use readiness::readiness_nodes; +use state::{ManualClusterState, next_available_index, normalize_label}; + +#[derive(Debug, Error)] +pub enum ManualClusterError { + #[error("failed to build topology: {source}")] + Build { + #[source] + source: TopologyBuildError, + }, + #[error(transparent)] + Compose(#[from] ComposeRunnerError), + #[error("manual compose cluster only supports default peer selection")] + UnsupportedPeers, + #[error("manual compose cluster does not support config patches")] + UnsupportedConfigPatch, + #[error("cluster has already been stopped")] + Stopped, + #[error("node name '{name}' already exists")] + NameExists { name: String }, + #[error("node index {index} is out of range (max {max})")] + IndexOutOfRange { index: usize, max: usize }, + #[error("node index {index} already started")] + AlreadyStarted { index: usize }, + #[error("no available nodes to start")] + NoAvailableNodes, + #[error("node name cannot be empty")] + EmptyName, +} + +struct EnvironmentSnapshot { + compose_path: PathBuf, + project_name: String, + root: PathBuf, +} + +/// Imperative, compose-backed cluster that can start nodes on demand. +pub struct ComposeManualCluster { + descriptors: GeneratedTopology, + environment: Mutex>, + state: Mutex, + host: String, +} + +impl ComposeManualCluster { + pub(crate) async fn from_config(config: TopologyConfig) -> Result { + let builder = TopologyBuilder::new(config); + let descriptors = builder + .build() + .map_err(|source| ManualClusterError::Build { source })?; + + ensure_supported_topology(&descriptors)?; + + ensure_docker_available().await?; + let environment = prepare_environment_manual(&descriptors, None).await?; + + let total_nodes = descriptors.nodes().len(); + + Ok(Self { + descriptors, + environment: Mutex::new(Some(environment)), + state: Mutex::new(ManualClusterState::new(total_nodes)), + host: compose_runner_host(), + }) + } + + fn lock_state(&self) -> std::sync::MutexGuard<'_, ManualClusterState> { + self.state + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()) + } + + fn resolve_label_and_index( + &self, + name: &str, + total_nodes: usize, + ) -> Result<(usize, String), ManualClusterError> { + let state = self.lock_state(); + + let label = normalize_label(name).ok_or(ManualClusterError::EmptyName)?; + let index = match state.name_to_index.get(&label).copied() { + Some(index) => index, + None => next_available_index(&state.started_indices, total_nodes) + .ok_or(ManualClusterError::NoAvailableNodes)?, + }; + + let node_label = label; + + if state.clients_by_name.contains_key(&node_label) { + return Err(ManualClusterError::NameExists { name: node_label }); + } + + if state.started_indices.contains(&index) { + return Err(ManualClusterError::AlreadyStarted { index }); + } + + Ok((index, node_label)) + } + + #[must_use] + pub fn node_client(&self, name: &str) -> Option { + if name.trim().is_empty() { + return None; + } + + let state = self.lock_state(); + + normalize_label(name).and_then(|label| state.clients_by_name.get(&label).cloned()) + } + + pub async fn start_node(&self, name: &str) -> Result { + self.start_node_with(name, StartNodeOptions::default()) + .await + } + + pub async fn start_node_with( + &self, + name: &str, + options: StartNodeOptions, + ) -> Result { + if !matches!(options.peers, PeerSelection::DefaultLayout) { + return Err(ManualClusterError::UnsupportedPeers); + } + + if options.config_patch.is_some() { + return Err(ManualClusterError::UnsupportedConfigPatch); + } + + let total_nodes = self.descriptors.nodes().len(); + let (index, node_label) = self.resolve_label_and_index(name, total_nodes)?; + + let snapshot = self.environment_snapshot()?; + let service = node_identifier(index); + + compose_up_service( + &snapshot.compose_path, + &snapshot.project_name, + &snapshot.root, + &service, + ) + .await + .map_err(ComposeRunnerError::Compose)?; + + let ports = discover_node_ports( + &snapshot.compose_path, + &snapshot.project_name, + &snapshot.root, + &service, + &self.descriptors, + index, + ) + .await?; + + let client = api_client_from_host_ports(&ports, &self.host)?; + + let mut state = self.lock_state(); + + state.register_node(index, node_label.clone(), client.clone(), ports)?; + + Ok(StartedNode { + name: node_label, + api: client, + }) + } + + pub fn stop_all(&self) { + let mut env = self + .environment + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + + let Some(environment) = env.take() else { + return; + }; + + if let Ok(cleanup) = environment.into_cleanup() { + Box::new(cleanup).cleanup(); + } + + let mut state = self.lock_state(); + state.reset(); + } + + pub async fn wait_network_ready(&self) -> Result<(), ReadinessError> { + let nodes = self.readiness_nodes(); + if nodes.len() <= 1 { + return Ok(()); + } + + ManualNetworkReadiness::new(nodes).wait().await + } + + fn readiness_nodes(&self) -> Vec { + let state = self.lock_state(); + readiness_nodes(&state, &self.descriptors) + } + + fn environment_snapshot(&self) -> Result { + let env = self + .environment + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + + let Some(environment) = env.as_ref() else { + return Err(ManualClusterError::Stopped); + }; + + Ok(EnvironmentSnapshot { + compose_path: environment.compose_path().to_path_buf(), + project_name: environment.project_name().to_owned(), + root: environment.root().to_path_buf(), + }) + } +} + +impl Drop for ComposeManualCluster { + fn drop(&mut self) { + self.stop_all(); + } +} + +#[async_trait] +impl ManualClusterHandle for ComposeManualCluster { + async fn start_node_with( + &self, + name: &str, + options: StartNodeOptions, + ) -> Result { + self.start_node_with(name, options) + .await + .map_err(|err| err.into()) + } + + async fn wait_network_ready(&self) -> Result<(), DynError> { + self.wait_network_ready().await.map_err(|err| err.into()) + } +} + +async fn discover_node_ports( + compose_path: &Path, + project_name: &str, + root: &Path, + service: &str, + descriptors: &GeneratedTopology, + index: usize, +) -> Result { + let node = + descriptors + .nodes() + .get(index) + .ok_or_else(|| ManualClusterError::IndexOutOfRange { + index, + max: descriptors.nodes().len().saturating_sub(1), + })?; + + let api = resolve_service_port_with(compose_path, project_name, root, service, node.api_port()) + .await?; + + let testing = resolve_service_port_with( + compose_path, + project_name, + root, + service, + node.testing_http_port(), + ) + .await?; + + Ok(NodeHostPorts { api, testing }) +} diff --git a/testing-framework/deployers/compose/src/manual/network.rs b/testing-framework/deployers/compose/src/manual/network.rs new file mode 100644 index 0000000..79f7721 --- /dev/null +++ b/testing-framework/deployers/compose/src/manual/network.rs @@ -0,0 +1,29 @@ +use reqwest::Url; +use testing_framework_core::{nodes::ApiClient, scenario::http_probe}; + +use super::ManualClusterError; +use crate::{ + errors::{ComposeRunnerError, NodeClientError}, + infrastructure::ports::NodeHostPorts, +}; + +pub fn api_client_from_host_ports( + ports: &NodeHostPorts, + host: &str, +) -> Result { + let api_url = endpoint_url(host, "api", ports.api)?; + let testing_url = endpoint_url(host, "testing", ports.testing)?; + + Ok(ApiClient::from_urls(api_url, Some(testing_url))) +} + +fn endpoint_url(host: &str, endpoint: &'static str, port: u16) -> Result { + Url::parse(&format!("http://{host}:{port}/")).map_err(|source| { + ManualClusterError::Compose(ComposeRunnerError::NodeClients(NodeClientError::Endpoint { + role: http_probe::NODE_ROLE, + endpoint, + port, + source, + })) + }) +} diff --git a/testing-framework/deployers/compose/src/manual/readiness.rs b/testing-framework/deployers/compose/src/manual/readiness.rs new file mode 100644 index 0000000..f1c4b2f --- /dev/null +++ b/testing-framework/deployers/compose/src/manual/readiness.rs @@ -0,0 +1,39 @@ +use std::collections::HashSet; + +use testing_framework_core::topology::{ + generation::GeneratedTopology, + readiness::{ReadinessNode, build_readiness_nodes}, + utils::multiaddr_port, +}; + +use super::state::ManualClusterState; + +pub fn readiness_nodes( + state: &ManualClusterState, + descriptors: &GeneratedTopology, +) -> Vec { + let mut indices = state.started_indices.iter().copied().collect::>(); + indices.sort_unstable(); + + let iter = indices.into_iter().filter_map(|index| { + let api = state.clients_by_index.get(index).and_then(|c| c.clone())?; + let node = descriptors.nodes().get(index)?; + let initial_peers = node + .general + .network_config + .backend + .initial_peers + .iter() + .filter_map(multiaddr_port) + .collect::>(); + + Some(( + format!("node#{index}@{}", node.network_port()), + api, + node.network_port(), + initial_peers, + )) + }); + + build_readiness_nodes(iter) +} diff --git a/testing-framework/deployers/compose/src/manual/state.rs b/testing-framework/deployers/compose/src/manual/state.rs new file mode 100644 index 0000000..6da9796 --- /dev/null +++ b/testing-framework/deployers/compose/src/manual/state.rs @@ -0,0 +1,83 @@ +use std::collections::{HashMap, HashSet}; + +use testing_framework_core::nodes::ApiClient; + +use super::ManualClusterError; +use crate::infrastructure::ports::NodeHostPorts; + +pub struct ManualClusterState { + pub started_indices: HashSet, + pub clients_by_name: HashMap, + pub name_to_index: HashMap, + pub clients_by_index: Vec>, + pub ports_by_index: Vec>, +} + +impl ManualClusterState { + pub fn new(total_nodes: usize) -> Self { + Self { + started_indices: HashSet::new(), + clients_by_name: HashMap::new(), + name_to_index: HashMap::new(), + clients_by_index: vec![None; total_nodes], + ports_by_index: vec![None; total_nodes], + } + } + + pub fn reset(&mut self) { + self.started_indices.clear(); + self.clients_by_name.clear(); + self.name_to_index.clear(); + + for slot in &mut self.clients_by_index { + *slot = None; + } + for slot in &mut self.ports_by_index { + *slot = None; + } + } + + pub fn register_node( + &mut self, + index: usize, + label: String, + client: ApiClient, + ports: NodeHostPorts, + ) -> Result<(), ManualClusterError> { + if self.clients_by_name.contains_key(&label) { + return Err(ManualClusterError::NameExists { name: label }); + } + if self.started_indices.contains(&index) { + return Err(ManualClusterError::AlreadyStarted { index }); + } + + self.started_indices.insert(index); + self.name_to_index.insert(label.clone(), index); + self.clients_by_name.insert(label, client.clone()); + if let Some(slot) = self.clients_by_index.get_mut(index) { + *slot = Some(client); + } + if let Some(slot) = self.ports_by_index.get_mut(index) { + *slot = Some(ports); + } + + Ok(()) + } +} + +pub fn normalize_label(name: &str) -> Option { + let trimmed = name.trim(); + if trimmed.is_empty() { + return None; + } + + if trimmed.starts_with("node-") { + Some(trimmed.to_string()) + } else { + Some(format!("node-{trimmed}")) + } +} + +pub fn next_available_index(started: &HashSet, total_nodes: usize) -> Option { + (0..total_nodes).find(|index| !started.contains(index)) +} diff --git a/testing-framework/deployers/compose/tests/manual_cluster.rs b/testing-framework/deployers/compose/tests/manual_cluster.rs new file mode 100644 index 0000000..7b7997c --- /dev/null +++ b/testing-framework/deployers/compose/tests/manual_cluster.rs @@ -0,0 +1,66 @@ +use std::{path::Path, time::Duration}; + +use anyhow::Result; +use testing_framework_core::topology::config::TopologyConfig; +use testing_framework_runner_compose::ComposeDeployer; +use tokio::time::{sleep, timeout}; + +const STARTUP_TIMEOUT: Duration = Duration::from_secs(180); +const CONSENSUS_POLL_TIMEOUT: Duration = Duration::from_secs(10); + +#[tokio::test(flavor = "multi_thread")] +async fn manual_cluster_compose_single_node() -> Result<()> { + // Note: Prefer letting the image use its bundled /opt/circuits. + // If you need to override circuits, set: + // LOGOS_BLOCKCHAIN_CIRCUITS=/path/to/host/circuits + // LOGOS_BLOCKCHAIN_CIRCUITS_DOCKER=/path/to/linux/circuits + // and ensure they match the node/cfgsync versions. + unsafe { + std::env::set_var("POL_PROOF_DEV_MODE", "true"); + } + + unsafe { + std::env::set_var( + "LOGOS_BLOCKCHAIN_TESTNET_IMAGE", + "logos-blockchain-testing:local", + ); + } + + if let Ok(host_circuits) = std::env::var("LOGOS_BLOCKCHAIN_CIRCUITS") { + if !Path::new(&host_circuits).exists() { + return Err(anyhow::anyhow!( + "host circuits directory not found at {host_circuits}" + )); + } + } + + if let Ok(docker_circuits) = std::env::var("LOGOS_BLOCKCHAIN_CIRCUITS_DOCKER") { + if !Path::new(&docker_circuits).exists() { + return Err(anyhow::anyhow!( + "docker circuits directory not found at {docker_circuits}" + )); + } + } + + let config = TopologyConfig::with_node_numbers(1); + let deployer = ComposeDeployer::new(); + let cluster = deployer.manual_cluster(config).await?; + + let node = cluster.start_node("seed").await?.api; + + let start = tokio::time::Instant::now(); + loop { + match timeout(CONSENSUS_POLL_TIMEOUT, node.consensus_info()).await { + Ok(Ok(_)) => break, + Ok(Err(err)) => { + if start.elapsed() >= STARTUP_TIMEOUT { + return Err(err.into()); + } + } + Err(_) => return Err(anyhow::anyhow!("consensus_info timed out")), + } + sleep(Duration::from_secs(2)).await; + } + + Ok(()) +} diff --git a/testing-framework/deployers/local/src/manual/mod.rs b/testing-framework/deployers/local/src/manual/mod.rs index f935065..e3422ba 100644 --- a/testing-framework/deployers/local/src/manual/mod.rs +++ b/testing-framework/deployers/local/src/manual/mod.rs @@ -4,16 +4,12 @@ use testing_framework_core::{ scenario::{DynError, StartNodeOptions, StartedNode}, topology::{ config::{TopologyBuildError, TopologyBuilder, TopologyConfig}, - readiness::{ReadinessCheck, ReadinessError}, + readiness::{ManualNetworkReadiness, ReadinessCheck, ReadinessError, ReadinessNode}, }, }; use thiserror::Error; -use crate::node_control::{LocalDynamicError, LocalDynamicNodes, ReadinessNode}; - -mod readiness; - -use readiness::ManualNetworkReadiness; +use crate::node_control::{LocalDynamicError, LocalDynamicNodes}; #[derive(Debug, Error)] pub enum ManualClusterError { diff --git a/testing-framework/deployers/local/src/node_control/mod.rs b/testing-framework/deployers/local/src/node_control/mod.rs index 60827a3..40f1f1a 100644 --- a/testing-framework/deployers/local/src/node_control/mod.rs +++ b/testing-framework/deployers/local/src/node_control/mod.rs @@ -12,7 +12,8 @@ use testing_framework_core::{ }, scenario::{DynError, NodeControlHandle, StartNodeOptions, StartedNode}, topology::{ - generation::{GeneratedTopology, find_expected_peer_counts}, + generation::GeneratedTopology, + readiness::{ReadinessNode, build_readiness_nodes}, utils::multiaddr_port, }, }; @@ -84,12 +85,6 @@ impl LocalDynamicSeed { } } -pub(crate) struct ReadinessNode { - pub(crate) label: String, - pub(crate) expected_peers: Option, - pub(crate) api: ApiClient, -} - impl LocalDynamicNodes { pub fn new(descriptors: GeneratedTopology, node_clients: NodeClients) -> Self { Self::new_with_seed(descriptors, node_clients, LocalDynamicSeed::default()) @@ -166,38 +161,26 @@ impl LocalDynamicNodes { .lock() .unwrap_or_else(|poisoned| poisoned.into_inner()); - let listen_ports = state - .nodes - .iter() - .map(|node| node.config().network.backend.swarm.port) - .collect::>(); + let iter = state.nodes.iter().enumerate().map(|(idx, node)| { + let config = node.config(); + let port = config.network.backend.swarm.port; + let initial_peers = config + .network + .backend + .initial_peers + .iter() + .filter_map(multiaddr_port) + .collect::>(); - let initial_peer_ports = state - .nodes - .iter() - .map(|node| { - node.config() - .network - .backend - .initial_peers - .iter() - .filter_map(multiaddr_port) - .collect::>() - }) - .collect::>(); + ( + format!("node#{idx}@{port}"), + node.api().clone(), + port, + initial_peers, + ) + }); - let expected_peer_counts = find_expected_peer_counts(&listen_ports, &initial_peer_ports); - - state - .nodes - .iter() - .enumerate() - .map(|(idx, node)| ReadinessNode { - label: format!("node#{idx}@{}", node.config().network.backend.swarm.port), - expected_peers: expected_peer_counts.get(idx).copied(), - api: node.api().clone(), - }) - .collect::>() + build_readiness_nodes(iter) } async fn start_node(