Common utilities

Framework common utilities
This commit is contained in:
Andrus Salumets 2026-02-02 05:10:12 +01:00 committed by GitHub
commit 4b43cb590b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 212 additions and 3 deletions

View File

@ -23,5 +23,7 @@ tracing-subscriber = { features = ["env-filter", "fmt"], version =
[dev-dependencies]
async-trait = { workspace = true }
[features]
[lints]
workspace = true

View File

@ -0,0 +1,111 @@
use std::time::Duration;
use anyhow::{Result, anyhow};
use testing_framework_core::{
scenario::StartNodeOptions,
topology::{
config::{TopologyBuilder, TopologyConfig},
configs::network::Libp2pNetworkLayout,
},
};
use testing_framework_runner_local::LocalDeployer;
use testing_framework_workflows::{start_node_with_timeout, wait_for_min_height};
use tokio::time::{sleep, timeout};
use tracing_subscriber::fmt::try_init;
const MIN_HEIGHT: u64 = 5;
const INITIAL_READY_TIMEOUT: Duration = Duration::from_secs(500);
const CATCH_UP_TIMEOUT: Duration = Duration::from_secs(300);
const START_NODE_TIMEOUT: Duration = Duration::from_secs(90);
const TEST_TIMEOUT: Duration = Duration::from_secs(600);
const POLL_INTERVAL: Duration = Duration::from_secs(1);
#[tokio::test]
#[ignore = "run manually with `cargo test -p runner-examples -- --ignored orphan_manual_cluster`"]
async fn orphan_manual_cluster() -> Result<()> {
let _ = try_init();
// Required env vars (set on the command line when running this test):
// - `POL_PROOF_DEV_MODE=true`
// - `LOGOS_BLOCKCHAIN_NODE_BIN=...`
// - `NOMOS_KZGRS_PARAMS_PATH=...` (path to KZG params directory/file)
// - `RUST_LOG=info` (optional; better visibility)
let config = TopologyConfig::with_node_numbers(3);
timeout(TEST_TIMEOUT, async {
let builder = TopologyBuilder::new(config).with_network_layout(Libp2pNetworkLayout::Full);
let deployer = LocalDeployer::new();
let cluster = deployer.manual_cluster_with_builder(builder)?;
// Nodes are stopped automatically when the cluster is dropped.
let node_a = start_node_with_timeout(
&cluster,
"a",
StartNodeOptions::default(),
START_NODE_TIMEOUT,
)
.await?
.api;
let node_b = start_node_with_timeout(
&cluster,
"b",
StartNodeOptions::default(),
START_NODE_TIMEOUT,
)
.await?
.api;
wait_for_min_height(
&[node_a.clone(), node_b.clone()],
MIN_HEIGHT,
INITIAL_READY_TIMEOUT,
POLL_INTERVAL,
)
.await?;
let behind_node = start_node_with_timeout(
&cluster,
"c",
StartNodeOptions::default(),
START_NODE_TIMEOUT,
)
.await?
.api;
timeout(CATCH_UP_TIMEOUT, async {
loop {
let node_a_info = node_a
.consensus_info()
.await
.map_err(|err| anyhow!("node-a consensus_info failed: {err}"))?;
let node_b_info = node_b
.consensus_info()
.await
.map_err(|err| anyhow!("node-b consensus_info failed: {err}"))?;
let behind_info = behind_node
.consensus_info()
.await
.map_err(|err| anyhow!("node-c consensus_info failed: {err}"))?;
let initial_min_height = node_a_info.height.min(node_b_info.height);
if behind_info.height >= initial_min_height.saturating_sub(1) {
return Ok::<(), anyhow::Error>(());
}
sleep(POLL_INTERVAL).await;
}
})
.await
.map_err(|_| anyhow!("timeout waiting for behind node to catch up"))??;
Ok::<(), anyhow::Error>(())
})
.await
.map_err(|_| anyhow!("test timeout exceeded"))??;
Ok(())
}

View File

@ -32,18 +32,24 @@ pub struct LocalManualCluster {
}
impl LocalManualCluster {
pub(crate) fn from_config(config: TopologyConfig) -> Result<Self, ManualClusterError> {
let builder = TopologyBuilder::new(config);
pub(crate) fn from_builder(builder: TopologyBuilder) -> Result<Self, ManualClusterError> {
let descriptors = builder
.build()
.map_err(|source| ManualClusterError::Build { source })?;
let nodes = LocalNodeManager::new(
descriptors,
testing_framework_core::scenario::NodeClients::default(),
);
Ok(Self { nodes })
}
pub(crate) fn from_config(config: TopologyConfig) -> Result<Self, ManualClusterError> {
let builder = TopologyBuilder::new(config);
Self::from_builder(builder)
}
#[must_use]
pub fn node_client(&self, name: &str) -> Option<ApiClient> {
self.nodes.node_client(name)

View File

@ -7,7 +7,11 @@ use testing_framework_core::{
BlockFeed, BlockFeedTask, Deployer, DynError, Metrics, NodeClients, NodeControlCapability,
RunContext, Runner, Scenario, ScenarioError, spawn_block_feed,
},
topology::{config::TopologyConfig, deployment::Topology, readiness::ReadinessError},
topology::{
config::{TopologyBuilder, TopologyConfig},
deployment::Topology,
readiness::ReadinessError,
},
};
use thiserror::Error;
use tracing::{debug, info};
@ -167,6 +171,14 @@ impl LocalDeployer {
LocalManualCluster::from_config(config)
}
/// Build a manual cluster from a pre-configured topology builder.
pub fn manual_cluster_with_builder(
&self,
builder: TopologyBuilder,
) -> Result<LocalManualCluster, ManualClusterError> {
LocalManualCluster::from_builder(builder)
}
async fn prepare_topology<Caps>(
scenario: &Scenario<Caps>,
membership_check: bool,

View File

@ -1,8 +1,10 @@
pub mod builder;
pub mod expectations;
pub mod manual;
pub mod util;
pub mod workloads;
pub use builder::{ChaosBuilderExt, ObservabilityBuilderExt, ScenarioBuilderExt};
pub use expectations::ConsensusLiveness;
pub use manual::{start_node_with_timeout, wait_for_min_height};
pub use workloads::transaction::TxInclusionExpectation;

View File

@ -0,0 +1,76 @@
use std::time::Duration;
use testing_framework_core::{
nodes::ApiClient,
scenario::{DynError, NodeControlHandle, StartNodeOptions, StartedNode},
};
use thiserror::Error;
use tokio::time::{Instant, sleep, timeout};
#[derive(Debug, Error)]
pub enum ManualTestError {
#[error("timeout: {message}")]
Timeout { message: String },
#[error("start node failed: {message}")]
StartNode { message: String },
#[error("consensus_info failed: {source}")]
ConsensusInfo {
#[from]
source: reqwest::Error,
},
}
pub async fn start_node_with_timeout<H: NodeControlHandle + ?Sized>(
handle: &H,
name: &str,
options: StartNodeOptions,
timeout_duration: Duration,
) -> Result<StartedNode, ManualTestError> {
timeout(timeout_duration, handle.start_node_with(name, options))
.await
.map_err(|_| ManualTestError::Timeout {
message: format!("starting node '{name}' exceeded timeout"),
})?
.map_err(|err: DynError| ManualTestError::StartNode {
message: err.to_string(),
})
}
pub async fn wait_for_min_height(
clients: &[ApiClient],
min_height: u64,
timeout_duration: Duration,
poll_interval: Duration,
) -> Result<(), ManualTestError> {
let start = Instant::now();
loop {
let mut heights = Vec::with_capacity(clients.len());
for client in clients {
match client.consensus_info().await {
Ok(info) => heights.push(info.height),
Err(err) => {
if start.elapsed() >= timeout_duration {
return Err(ManualTestError::ConsensusInfo { source: err });
}
sleep(poll_interval).await;
continue;
}
}
}
if heights.len() == clients.len() && heights.iter().all(|height| *height >= min_height) {
return Ok(());
}
if start.elapsed() >= timeout_duration {
return Err(ManualTestError::Timeout {
message: format!(
"min height {min_height} not reached before timeout; heights={heights:?}"
),
});
}
sleep(poll_interval).await;
}
}