Remove DA workload usage from framework

This commit is contained in:
andrussal 2026-01-19 02:28:49 +01:00
parent 7038aaa201
commit 5304c3e808
9 changed files with 2252 additions and 1352 deletions

View File

@ -15,8 +15,8 @@ ignore = [
"RUSTSEC-2025-0012", # backoff unmaintained; upstream workspace still relies on it
"RUSTSEC-2025-0055", # tracing-subscriber ansi escape issue; upstream dependency
"RUSTSEC-2025-0134", # rustls-pemfile unmaintained; transitive via rustls stack
"RUSTSEC-2025-0141", # bincode unmaintained; no safe upgrade available
"RUSTSEC-2026-0002", # lru IterMut soundness issue; pending upstream upgrade
"RUSTSEC-2025-0141", # bincode unmaintained; upstream dependency
"RUSTSEC-2026-0002", # lru unsound; upstream dependency until upgrade available
]
yanked = "deny"
@ -31,6 +31,7 @@ allow = [
"BSD-2-Clause",
"BSD-3-Clause",
"BSL-1.0",
"BlueOak-1.0.0",
"CC0-1.0",
"CDLA-Permissive-2.0",
"ISC",

3033
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,31 @@
Feature: Testing Framework - Auto Local/Compose Deployer
Scenario: Run auto deployer smoke scenario (tx + liveness)
Given we have a CLI deployer specified
And topology has 1 validators and 1 executors
And run duration is 60 seconds
And wallets total funds is 1000000000 split across 50 users
And transactions rate is 1 per block
And expect consensus liveness
When run scenario
Then scenario should succeed
# Note: This test may fail on slow computers
Scenario: Run auto deployer stress smoke scenario (tx + liveness)
Given we have a CLI deployer specified
And topology has 3 validators and 3 executors
And run duration is 120 seconds
And wallets total funds is 1000000000 split across 500 users
And transactions rate is 10 per block
And expect consensus liveness
When run scenario
Then scenario should succeed
Scenario: Run auto deployer stress smoke scenario no liveness (tx)
Given we have a CLI deployer specified
And topology has 3 validators and 3 executors
And run duration is 120 seconds
And wallets total funds is 1000000000 split across 500 users
And transactions rate is 10 per block
When run scenario
Then scenario should succeed

View File

@ -0,0 +1,12 @@
@compose
Feature: Testing Framework - Compose Runner
Scenario: Run a compose smoke scenario (tx + liveness)
Given deployer is "compose"
And topology has 1 validators and 1 executors
And wallets total funds is 1000 split across 10 users
And run duration is 60 seconds
And transactions rate is 1 per block
And expect consensus liveness
When run scenario
Then scenario should succeed

View File

@ -0,0 +1,12 @@
@local
Feature: Testing Framework - Local Runner
Scenario: Run a local smoke scenario (tx + liveness)
Given deployer is "local"
And topology has 1 validators and 1 executors
And run duration is 60 seconds
And wallets total funds is 1000000000 split across 50 users
And transactions rate is 1 per block
And expect consensus liveness
When run scenario
Then scenario should succeed

View File

@ -0,0 +1,72 @@
use cucumber::{then, when};
use testing_framework_core::scenario::Deployer as _;
use testing_framework_runner_compose::ComposeDeployer;
use testing_framework_runner_local::LocalDeployer;
use crate::world::{DeployerKind, StepError, StepResult, TestingFrameworkWorld};
#[when(expr = "run scenario")]
async fn run_scenario(world: &mut TestingFrameworkWorld) -> StepResult {
let deployer = world.deployer.ok_or(StepError::MissingDeployer)?;
world.run.result = Some(match deployer {
DeployerKind::Local => {
let mut scenario = world.build_local_scenario()?;
let deployer = LocalDeployer::default();
let result = async {
let runner =
deployer
.deploy(&scenario)
.await
.map_err(|e| StepError::RunFailed {
message: format!("local deploy failed: {e}"),
})?;
runner
.run(&mut scenario)
.await
.map_err(|e| StepError::RunFailed {
message: format!("scenario run failed: {e}"),
})?;
Ok::<(), StepError>(())
}
.await;
result.map_err(|e| e.to_string())
}
DeployerKind::Compose => {
let mut scenario = world.build_compose_scenario()?;
let deployer = ComposeDeployer::default().with_readiness(world.readiness_checks);
let result = async {
let runner =
deployer
.deploy(&scenario)
.await
.map_err(|e| StepError::RunFailed {
message: format!("compose deploy failed: {e}"),
})?;
runner
.run(&mut scenario)
.await
.map_err(|e| StepError::RunFailed {
message: format!("scenario run failed: {e}"),
})?;
Ok::<(), StepError>(())
}
.await;
result.map_err(|e| e.to_string())
}
});
Ok(())
}
#[then(expr = "scenario should succeed")]
async fn scenario_should_succeed(world: &mut TestingFrameworkWorld) -> StepResult {
match world.run.result.take() {
Some(Ok(())) => Ok(()),
Some(Err(message)) => Err(StepError::RunFailed { message }),
None => Err(StepError::RunFailed {
message: "scenario was not run".to_owned(),
}),
}
}

View File

@ -0,0 +1,41 @@
use cucumber::given;
use crate::world::{StepResult, TestingFrameworkWorld};
#[given(expr = "wallets total funds is {int} split across {int} users")]
async fn wallets_total_funds(
world: &mut TestingFrameworkWorld,
total_funds: u64,
users: usize,
) -> StepResult {
world.set_wallets(total_funds, users)
}
#[given(expr = "run duration is {int} seconds")]
async fn run_duration(world: &mut TestingFrameworkWorld, seconds: u64) -> StepResult {
world.set_run_duration(seconds)
}
#[given(expr = "transactions rate is {int} per block")]
async fn tx_rate(world: &mut TestingFrameworkWorld, rate: u64) -> StepResult {
world.set_transactions_rate(rate, None)
}
#[given(expr = "transactions rate is {int} per block using {int} users")]
async fn tx_rate_with_users(
world: &mut TestingFrameworkWorld,
rate: u64,
users: usize,
) -> StepResult {
world.set_transactions_rate(rate, Some(users))
}
#[given(expr = "expect consensus liveness")]
async fn expect_consensus_liveness(world: &mut TestingFrameworkWorld) -> StepResult {
world.enable_consensus_liveness()
}
#[given(expr = "consensus liveness lag allowance is {int}")]
async fn liveness_lag_allowance(world: &mut TestingFrameworkWorld, blocks: u64) -> StepResult {
world.set_consensus_liveness_lag_allowance(blocks)
}

View File

@ -0,0 +1,331 @@
use std::{env, path::PathBuf, time::Duration};
use cucumber::World;
use testing_framework_core::scenario::{
Builder, NodeControlCapability, Scenario, ScenarioBuildError, ScenarioBuilder,
};
use testing_framework_workflows::{ScenarioBuilderExt as _, expectations::ConsensusLiveness};
use thiserror::Error;
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub enum DeployerKind {
#[default]
Local,
Compose,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum NetworkKind {
Star,
}
#[derive(Debug, Default, Clone)]
pub struct RunState {
pub result: Option<Result<(), String>>,
}
#[derive(Debug, Default, Clone, Copy)]
pub struct ScenarioSpec {
pub topology: Option<TopologySpec>,
pub duration_secs: Option<u64>,
pub wallets: Option<WalletSpec>,
pub transactions: Option<TransactionSpec>,
pub consensus_liveness: Option<ConsensusLivenessSpec>,
}
#[derive(Debug, Clone, Copy)]
pub struct TopologySpec {
pub validators: usize,
pub executors: usize,
pub network: NetworkKind,
}
#[derive(Debug, Clone, Copy)]
pub struct WalletSpec {
pub total_funds: u64,
pub users: usize,
}
#[derive(Debug, Clone, Copy)]
pub struct TransactionSpec {
pub rate_per_block: u64,
pub users: Option<usize>,
}
#[derive(Debug, Clone, Copy)]
pub struct ConsensusLivenessSpec {
pub lag_allowance: Option<u64>,
}
#[derive(Debug, Error)]
pub enum StepError {
#[error("deployer is not selected; set it first (e.g. `Given deployer is \"local\"`)")]
MissingDeployer,
#[error("scenario topology is not configured")]
MissingTopology,
#[error("scenario run duration is not configured")]
MissingRunDuration,
#[error("unsupported deployer kind: {value}")]
UnsupportedDeployer { value: String },
#[error("step requires deployer {expected:?}, but current deployer is {actual:?}")]
DeployerMismatch {
expected: DeployerKind,
actual: DeployerKind,
},
#[error("invalid argument: {message}")]
InvalidArgument { message: String },
#[error("{message}")]
Preflight { message: String },
#[error("failed to build scenario: {source}")]
ScenarioBuild {
#[source]
source: ScenarioBuildError,
},
#[error("{message}")]
RunFailed { message: String },
}
pub type StepResult = Result<(), StepError>;
#[derive(World, Debug, Default)]
pub struct TestingFrameworkWorld {
pub deployer: Option<DeployerKind>,
pub spec: ScenarioSpec,
pub run: RunState,
pub readiness_checks: bool,
}
impl TestingFrameworkWorld {
pub fn set_deployer(&mut self, kind: DeployerKind) -> StepResult {
self.deployer = Some(kind);
Ok(())
}
pub fn set_topology(
&mut self,
validators: usize,
executors: usize,
network: NetworkKind,
) -> StepResult {
self.spec.topology = Some(TopologySpec {
validators: positive_usize("validators", validators)?,
executors,
network,
});
Ok(())
}
pub fn set_run_duration(&mut self, seconds: u64) -> StepResult {
self.spec.duration_secs = Some(positive_u64("duration", seconds)?);
Ok(())
}
pub fn set_wallets(&mut self, total_funds: u64, users: usize) -> StepResult {
self.spec.wallets = Some(WalletSpec {
total_funds,
users: positive_usize("wallet users", users)?,
});
Ok(())
}
pub fn set_transactions_rate(
&mut self,
rate_per_block: u64,
users: Option<usize>,
) -> StepResult {
if self.spec.transactions.is_some() {
return Err(StepError::InvalidArgument {
message: "transactions workload already configured".to_owned(),
});
}
if users.is_some_and(|u| u == 0) {
return Err(StepError::InvalidArgument {
message: "transactions users must be > 0".to_owned(),
});
}
self.spec.transactions = Some(TransactionSpec {
rate_per_block: positive_u64("transactions rate", rate_per_block)?,
users,
});
Ok(())
}
pub fn enable_consensus_liveness(&mut self) -> StepResult {
if self.spec.consensus_liveness.is_none() {
self.spec.consensus_liveness = Some(ConsensusLivenessSpec {
lag_allowance: None,
});
}
Ok(())
}
pub fn set_consensus_liveness_lag_allowance(&mut self, blocks: u64) -> StepResult {
let blocks = positive_u64("lag allowance", blocks)?;
self.spec.consensus_liveness = Some(ConsensusLivenessSpec {
lag_allowance: Some(blocks),
});
Ok(())
}
pub fn build_local_scenario(&self) -> Result<Scenario<()>, StepError> {
self.preflight(DeployerKind::Local)?;
let builder = self.make_builder_for_deployer::<()>(DeployerKind::Local)?;
builder
.build()
.map_err(|source| StepError::ScenarioBuild { source })
}
pub fn build_compose_scenario(&self) -> Result<Scenario<NodeControlCapability>, StepError> {
self.preflight(DeployerKind::Compose)?;
let builder =
self.make_builder_for_deployer::<NodeControlCapability>(DeployerKind::Compose)?;
builder
.build()
.map_err(|source| StepError::ScenarioBuild { source })
}
pub fn preflight(&self, expected: DeployerKind) -> Result<(), StepError> {
let actual = self.deployer.ok_or(StepError::MissingDeployer)?;
if actual != expected {
return Err(StepError::DeployerMismatch { expected, actual });
}
if !is_truthy_env("POL_PROOF_DEV_MODE") {
return Err(StepError::Preflight {
message:
"POL_PROOF_DEV_MODE must be set to \"true\" (or \"1\") for practical test runs."
.to_owned(),
});
}
if expected == DeployerKind::Local {
let node_ok = env::var_os("NOMOS_NODE_BIN")
.map(PathBuf::from)
.is_some_and(|p| p.is_file())
|| shared_host_bin_path("nomos-node").is_file();
let requires_executor_bin = self
.spec
.topology
.is_some_and(|topology| topology.executors > 0);
let exec_ok = if requires_executor_bin {
env::var_os("NOMOS_EXECUTOR_BIN")
.map(PathBuf::from)
.is_some_and(|p| p.is_file())
|| shared_host_bin_path("nomos-executor").is_file()
} else {
true
};
if !(node_ok && exec_ok) {
return Err(StepError::Preflight {
message: "Missing Logos host binaries. Set NOMOS_NODE_BIN (and NOMOS_EXECUTOR_BIN if your scenario uses executors), or run `scripts/run/run-examples.sh host` to restore them into `testing-framework/assets/stack/bin`.".to_owned(),
});
}
}
Ok(())
}
fn make_builder_for_deployer<Caps: Default>(
&self,
expected: DeployerKind,
) -> Result<Builder<Caps>, StepError> {
let actual = self.deployer.ok_or(StepError::MissingDeployer)?;
if actual != expected {
return Err(StepError::DeployerMismatch { expected, actual });
}
let topology = self.spec.topology.ok_or(StepError::MissingTopology)?;
let duration_secs = self
.spec
.duration_secs
.ok_or(StepError::MissingRunDuration)?;
let mut builder: Builder<Caps> = make_builder(topology).with_capabilities(Caps::default());
builder = builder.with_run_duration(Duration::from_secs(duration_secs));
if let Some(wallets) = self.spec.wallets {
builder = builder.initialize_wallet(wallets.total_funds, wallets.users);
}
if let Some(tx) = self.spec.transactions {
builder = builder.transactions_with(|flow| {
let mut flow = flow.rate(tx.rate_per_block);
if let Some(users) = tx.users {
flow = flow.users(users);
}
flow
});
}
if let Some(liveness) = self.spec.consensus_liveness {
if let Some(lag) = liveness.lag_allowance {
builder =
builder.with_expectation(ConsensusLiveness::default().with_lag_allowance(lag));
} else {
builder = builder.expect_consensus_liveness();
}
}
Ok(builder)
}
}
fn make_builder(topology: TopologySpec) -> Builder<()> {
ScenarioBuilder::topology_with(|t| {
let base = match topology.network {
NetworkKind::Star => t.network_star(),
};
base.validators(topology.validators)
.executors(topology.executors)
})
}
fn is_truthy_env(key: &str) -> bool {
env::var(key)
.ok()
.is_some_and(|value| matches!(value.as_str(), "1" | "true" | "TRUE" | "yes" | "YES"))
}
fn positive_usize(label: &str, value: usize) -> Result<usize, StepError> {
if value == 0 {
Err(StepError::InvalidArgument {
message: format!("{label} must be > 0"),
})
} else {
Ok(value)
}
}
fn positive_u64(label: &str, value: u64) -> Result<u64, StepError> {
if value == 0 {
Err(StepError::InvalidArgument {
message: format!("{label} must be > 0"),
})
} else {
Ok(value)
}
}
pub fn parse_deployer(value: &str) -> Result<DeployerKind, StepError> {
match value.trim().to_ascii_lowercase().as_str() {
"local" | "host" => Ok(DeployerKind::Local),
"compose" | "docker" => Ok(DeployerKind::Compose),
other => Err(StepError::UnsupportedDeployer {
value: other.to_owned(),
}),
}
}
pub fn shared_host_bin_path(binary_name: &str) -> PathBuf {
let cucumber_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
cucumber_dir.join("../assets/stack/bin").join(binary_name)
}

View File

@ -1,13 +1,10 @@
use std::sync::Arc;
use async_trait::async_trait;
use testing_framework_core::{
scenario::{
BlockFeed, BlockFeedTask, Deployer, DynError, Metrics, NodeClients, NodeControlCapability,
RunContext, Runner, Scenario, ScenarioError, spawn_block_feed,
BlockFeed, BlockFeedTask, Deployer, DynError, Metrics, NodeClients, RunContext, Runner,
Scenario, ScenarioError, spawn_block_feed,
},
topology::{
config::TopologyConfig,
deployment::{SpawnTopologyError, Topology},
readiness::ReadinessError,
},
@ -15,10 +12,6 @@ use testing_framework_core::{
use thiserror::Error;
use tracing::{debug, info};
use crate::{
manual::{LocalManualCluster, ManualClusterError},
node_control::{LocalDynamicNodes, LocalDynamicSeed},
};
/// Spawns validators and executors as local processes, reusing the existing
/// integration harness.
#[derive(Clone)]
@ -89,44 +82,6 @@ impl Deployer<()> for LocalDeployer {
}
}
#[async_trait]
impl Deployer<NodeControlCapability> for LocalDeployer {
type Error = LocalDeployerError;
async fn deploy(
&self,
scenario: &Scenario<NodeControlCapability>,
) -> Result<Runner, Self::Error> {
info!(
validators = scenario.topology().validators().len(),
executors = scenario.topology().executors().len(),
"starting local deployment with node control"
);
let topology = Self::prepare_topology(scenario).await?;
let node_clients = NodeClients::from_topology(scenario.topology(), &topology);
let node_control = Arc::new(LocalDynamicNodes::new_with_seed(
scenario.topology().clone(),
node_clients.clone(),
LocalDynamicSeed::from_topology(scenario.topology()),
));
let (block_feed, block_feed_guard) = spawn_block_feed_with(&node_clients).await?;
let context = RunContext::new(
scenario.topology().clone(),
Some(topology),
node_clients,
scenario.duration(),
Metrics::empty(),
block_feed,
Some(node_control),
);
Ok(Runner::new(context, Some(Box::new(block_feed_guard))))
}
}
impl LocalDeployer {
#[must_use]
/// Construct a local deployer.
@ -134,25 +89,13 @@ impl LocalDeployer {
Self::default()
}
/// Build a manual cluster using this deployer's local implementation.
pub fn manual_cluster(
&self,
config: TopologyConfig,
) -> Result<LocalManualCluster, ManualClusterError> {
LocalManualCluster::from_config(config)
}
async fn prepare_topology<Caps>(
scenario: &Scenario<Caps>,
) -> Result<Topology, LocalDeployerError> {
async fn prepare_topology(scenario: &Scenario<()>) -> Result<Topology, LocalDeployerError> {
let descriptors = scenario.topology();
info!(
validators = descriptors.validators().len(),
executors = descriptors.executors().len(),
"spawning local validators/executors"
);
let topology = descriptors
.clone()
.spawn_local()
@ -165,7 +108,6 @@ impl LocalDeployer {
})?;
info!("local nodes are ready");
Ok(topology)
}
}
@ -178,7 +120,6 @@ impl Default for LocalDeployer {
async fn wait_for_readiness(topology: &Topology) -> Result<(), ReadinessError> {
info!("waiting for local network readiness");
topology.wait_network_ready().await?;
Ok(())
}
@ -192,7 +133,7 @@ async fn spawn_block_feed_with(
"selecting validator client for local block feed"
);
let Some(block_source_client) = node_clients.random_validator() else {
let Some(block_source_client) = node_clients.random_validator().cloned() else {
return Err(LocalDeployerError::WorkloadFailed {
source: "block feed requires at least one validator".into(),
});