From 4ac5d07c674445b0482a05a909f44f77fa49e760 Mon Sep 17 00:00:00 2001 From: andrussal Date: Sun, 1 Feb 2026 08:10:12 +0100 Subject: [PATCH] Add orphan manual cluster test utilities --- examples/Cargo.toml | 2 + examples/tests/orphan_manual_cluster.rs | 111 ++++++++++++++++++ .../deployers/local/src/manual/mod.rs | 10 +- .../deployers/local/src/runner.rs | 14 ++- testing-framework/workflows/src/lib.rs | 2 + testing-framework/workflows/src/manual.rs | 76 ++++++++++++ 6 files changed, 212 insertions(+), 3 deletions(-) create mode 100644 examples/tests/orphan_manual_cluster.rs create mode 100644 testing-framework/workflows/src/manual.rs diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 44bbca2..7fb410b 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -23,5 +23,7 @@ tracing-subscriber = { features = ["env-filter", "fmt"], version = [dev-dependencies] async-trait = { workspace = true } +[features] + [lints] workspace = true diff --git a/examples/tests/orphan_manual_cluster.rs b/examples/tests/orphan_manual_cluster.rs new file mode 100644 index 0000000..f6243d5 --- /dev/null +++ b/examples/tests/orphan_manual_cluster.rs @@ -0,0 +1,111 @@ +use std::time::Duration; + +use anyhow::{Result, anyhow}; +use testing_framework_core::{ + scenario::StartNodeOptions, + topology::{ + config::{TopologyBuilder, TopologyConfig}, + configs::network::Libp2pNetworkLayout, + }, +}; +use testing_framework_runner_local::LocalDeployer; +use testing_framework_workflows::{start_node_with_timeout, wait_for_min_height}; +use tokio::time::{sleep, timeout}; +use tracing_subscriber::fmt::try_init; + +const MIN_HEIGHT: u64 = 5; +const INITIAL_READY_TIMEOUT: Duration = Duration::from_secs(500); +const CATCH_UP_TIMEOUT: Duration = Duration::from_secs(300); +const START_NODE_TIMEOUT: Duration = Duration::from_secs(90); +const TEST_TIMEOUT: Duration = Duration::from_secs(600); +const POLL_INTERVAL: Duration = Duration::from_secs(1); + +#[tokio::test] +#[ignore = "run manually with `cargo test -p runner-examples -- --ignored orphan_manual_cluster`"] +async fn orphan_manual_cluster() -> Result<()> { + let _ = try_init(); + // Required env vars (set on the command line when running this test): + // - `POL_PROOF_DEV_MODE=true` + // - `LOGOS_BLOCKCHAIN_NODE_BIN=...` + // - `NOMOS_KZGRS_PARAMS_PATH=...` (path to KZG params directory/file) + // - `RUST_LOG=info` (optional; better visibility) + + let config = TopologyConfig::with_node_numbers(3); + timeout(TEST_TIMEOUT, async { + let builder = TopologyBuilder::new(config).with_network_layout(Libp2pNetworkLayout::Full); + + let deployer = LocalDeployer::new(); + let cluster = deployer.manual_cluster_with_builder(builder)?; + // Nodes are stopped automatically when the cluster is dropped. + + let node_a = start_node_with_timeout( + &cluster, + "a", + StartNodeOptions::default(), + START_NODE_TIMEOUT, + ) + .await? + .api; + + let node_b = start_node_with_timeout( + &cluster, + "b", + StartNodeOptions::default(), + START_NODE_TIMEOUT, + ) + .await? + .api; + + wait_for_min_height( + &[node_a.clone(), node_b.clone()], + MIN_HEIGHT, + INITIAL_READY_TIMEOUT, + POLL_INTERVAL, + ) + .await?; + + let behind_node = start_node_with_timeout( + &cluster, + "c", + StartNodeOptions::default(), + START_NODE_TIMEOUT, + ) + .await? + .api; + + timeout(CATCH_UP_TIMEOUT, async { + loop { + let node_a_info = node_a + .consensus_info() + .await + .map_err(|err| anyhow!("node-a consensus_info failed: {err}"))?; + + let node_b_info = node_b + .consensus_info() + .await + .map_err(|err| anyhow!("node-b consensus_info failed: {err}"))?; + + let behind_info = behind_node + .consensus_info() + .await + .map_err(|err| anyhow!("node-c consensus_info failed: {err}"))?; + + let initial_min_height = node_a_info.height.min(node_b_info.height); + + if behind_info.height >= initial_min_height.saturating_sub(1) { + return Ok::<(), anyhow::Error>(()); + } + + sleep(POLL_INTERVAL).await; + } + }) + .await + .map_err(|_| anyhow!("timeout waiting for behind node to catch up"))??; + + Ok::<(), anyhow::Error>(()) + }) + .await + .map_err(|_| anyhow!("test timeout exceeded"))??; + + Ok(()) +} diff --git a/testing-framework/deployers/local/src/manual/mod.rs b/testing-framework/deployers/local/src/manual/mod.rs index 51a75bf..fa90a61 100644 --- a/testing-framework/deployers/local/src/manual/mod.rs +++ b/testing-framework/deployers/local/src/manual/mod.rs @@ -32,18 +32,24 @@ pub struct LocalManualCluster { } impl LocalManualCluster { - pub(crate) fn from_config(config: TopologyConfig) -> Result { - let builder = TopologyBuilder::new(config); + pub(crate) fn from_builder(builder: TopologyBuilder) -> Result { let descriptors = builder .build() .map_err(|source| ManualClusterError::Build { source })?; + let nodes = LocalNodeManager::new( descriptors, testing_framework_core::scenario::NodeClients::default(), ); + Ok(Self { nodes }) } + pub(crate) fn from_config(config: TopologyConfig) -> Result { + let builder = TopologyBuilder::new(config); + Self::from_builder(builder) + } + #[must_use] pub fn node_client(&self, name: &str) -> Option { self.nodes.node_client(name) diff --git a/testing-framework/deployers/local/src/runner.rs b/testing-framework/deployers/local/src/runner.rs index cfbb7d9..45b8888 100644 --- a/testing-framework/deployers/local/src/runner.rs +++ b/testing-framework/deployers/local/src/runner.rs @@ -7,7 +7,11 @@ use testing_framework_core::{ BlockFeed, BlockFeedTask, Deployer, DynError, Metrics, NodeClients, NodeControlCapability, RunContext, Runner, Scenario, ScenarioError, spawn_block_feed, }, - topology::{config::TopologyConfig, deployment::Topology, readiness::ReadinessError}, + topology::{ + config::{TopologyBuilder, TopologyConfig}, + deployment::Topology, + readiness::ReadinessError, + }, }; use thiserror::Error; use tracing::{debug, info}; @@ -167,6 +171,14 @@ impl LocalDeployer { LocalManualCluster::from_config(config) } + /// Build a manual cluster from a pre-configured topology builder. + pub fn manual_cluster_with_builder( + &self, + builder: TopologyBuilder, + ) -> Result { + LocalManualCluster::from_builder(builder) + } + async fn prepare_topology( scenario: &Scenario, membership_check: bool, diff --git a/testing-framework/workflows/src/lib.rs b/testing-framework/workflows/src/lib.rs index 72ef0b1..0e97399 100644 --- a/testing-framework/workflows/src/lib.rs +++ b/testing-framework/workflows/src/lib.rs @@ -1,8 +1,10 @@ pub mod builder; pub mod expectations; +pub mod manual; pub mod util; pub mod workloads; pub use builder::{ChaosBuilderExt, ObservabilityBuilderExt, ScenarioBuilderExt}; pub use expectations::ConsensusLiveness; +pub use manual::{start_node_with_timeout, wait_for_min_height}; pub use workloads::transaction::TxInclusionExpectation; diff --git a/testing-framework/workflows/src/manual.rs b/testing-framework/workflows/src/manual.rs new file mode 100644 index 0000000..bbc0b9e --- /dev/null +++ b/testing-framework/workflows/src/manual.rs @@ -0,0 +1,76 @@ +use std::time::Duration; + +use testing_framework_core::{ + nodes::ApiClient, + scenario::{DynError, NodeControlHandle, StartNodeOptions, StartedNode}, +}; +use thiserror::Error; +use tokio::time::{Instant, sleep, timeout}; + +#[derive(Debug, Error)] +pub enum ManualTestError { + #[error("timeout: {message}")] + Timeout { message: String }, + #[error("start node failed: {message}")] + StartNode { message: String }, + #[error("consensus_info failed: {source}")] + ConsensusInfo { + #[from] + source: reqwest::Error, + }, +} + +pub async fn start_node_with_timeout( + handle: &H, + name: &str, + options: StartNodeOptions, + timeout_duration: Duration, +) -> Result { + timeout(timeout_duration, handle.start_node_with(name, options)) + .await + .map_err(|_| ManualTestError::Timeout { + message: format!("starting node '{name}' exceeded timeout"), + })? + .map_err(|err: DynError| ManualTestError::StartNode { + message: err.to_string(), + }) +} + +pub async fn wait_for_min_height( + clients: &[ApiClient], + min_height: u64, + timeout_duration: Duration, + poll_interval: Duration, +) -> Result<(), ManualTestError> { + let start = Instant::now(); + + loop { + let mut heights = Vec::with_capacity(clients.len()); + for client in clients { + match client.consensus_info().await { + Ok(info) => heights.push(info.height), + Err(err) => { + if start.elapsed() >= timeout_duration { + return Err(ManualTestError::ConsensusInfo { source: err }); + } + sleep(poll_interval).await; + continue; + } + } + } + + if heights.len() == clients.len() && heights.iter().all(|height| *height >= min_height) { + return Ok(()); + } + + if start.elapsed() >= timeout_duration { + return Err(ManualTestError::Timeout { + message: format!( + "min height {min_height} not reached before timeout; heights={heights:?}" + ), + }); + } + + sleep(poll_interval).await; + } +}