core: make scenario/topology building fallible

This commit is contained in:
andrussal 2025-12-18 22:48:45 +01:00
parent 4da09bfe85
commit 47b4e8531d
39 changed files with 375 additions and 155 deletions

View File

@ -1,9 +1,11 @@
use std::time::Duration;
use testing_framework_core::scenario::ScenarioBuilder;
use testing_framework_core::scenario::{Scenario, ScenarioBuilder};
use testing_framework_workflows::ScenarioBuilderExt;
pub fn scenario_plan() -> testing_framework_core::scenario::Scenario<()> {
use crate::SnippetResult;
pub fn scenario_plan() -> SnippetResult<Scenario<()>> {
ScenarioBuilder::topology_with(|t| t.network_star().validators(3).executors(2))
.wallets(50)
.transactions_with(|txs| txs.rate(5).users(20))

View File

@ -1,11 +1,11 @@
use std::time::Duration;
use testing_framework_core::scenario::ScenarioBuilder;
use testing_framework_core::scenario::{NodeControlCapability, Scenario, ScenarioBuilder};
use testing_framework_workflows::{ScenarioBuilderExt, workloads::chaos::RandomRestartWorkload};
pub fn random_restart_plan() -> testing_framework_core::scenario::Scenario<
testing_framework_core::scenario::NodeControlCapability,
> {
use crate::SnippetResult;
pub fn random_restart_plan() -> SnippetResult<Scenario<NodeControlCapability>> {
ScenarioBuilder::topology_with(|t| t.network_star().validators(2).executors(1))
.enable_node_control()
.with_workload(RandomRestartWorkload::new(

View File

@ -1,6 +1,8 @@
use testing_framework_core::scenario::ScenarioBuilder;
use testing_framework_core::scenario::{Scenario, ScenarioBuilder};
use testing_framework_workflows::ScenarioBuilderExt;
pub fn build_plan() -> testing_framework_core::scenario::Scenario<()> {
use crate::SnippetResult;
pub fn build_plan() -> SnippetResult<Scenario<()>> {
ScenarioBuilder::topology_with(|t| t.network_star().validators(1).executors(0)).build() // Construct the final Scenario
}

View File

@ -19,7 +19,7 @@ pub async fn run_test() -> Result<()> {
})
.expect_consensus_liveness()
.with_run_duration(Duration::from_secs(90))
.build();
.build()?;
let deployer = LocalDeployer::default();
let runner = deployer.deploy(&plan).await?;

View File

@ -1,7 +1,9 @@
use testing_framework_core::scenario::ScenarioBuilder;
use testing_framework_core::scenario::{Scenario, ScenarioBuilder};
use testing_framework_workflows::ScenarioBuilderExt;
pub fn expectations_plan() -> testing_framework_core::scenario::Scenario<()> {
use crate::SnippetResult;
pub fn expectations_plan() -> SnippetResult<Scenario<()>> {
ScenarioBuilder::topology_with(|t| t.network_star().validators(1).executors(0))
.expect_consensus_liveness() // Assert blocks are produced continuously
.build()

View File

@ -1,9 +1,11 @@
use std::time::Duration;
use testing_framework_core::scenario::ScenarioBuilder;
use testing_framework_core::scenario::{Scenario, ScenarioBuilder};
use testing_framework_workflows::ScenarioBuilderExt;
pub fn run_duration_plan() -> testing_framework_core::scenario::Scenario<()> {
use crate::SnippetResult;
pub fn run_duration_plan() -> SnippetResult<Scenario<()>> {
ScenarioBuilder::topology_with(|t| t.network_star().validators(1).executors(0))
.with_run_duration(Duration::from_secs(120)) // Run for 120 seconds
.build()

View File

@ -1,7 +1,9 @@
use testing_framework_core::scenario::ScenarioBuilder;
use testing_framework_core::scenario::{Scenario, ScenarioBuilder};
use testing_framework_workflows::ScenarioBuilderExt;
pub fn transactions_plan() -> testing_framework_core::scenario::Scenario<()> {
use crate::SnippetResult;
pub fn transactions_plan() -> SnippetResult<Scenario<()>> {
ScenarioBuilder::topology_with(|t| t.network_star().validators(1).executors(0))
.wallets(50)
.transactions_with(|txs| {

View File

@ -1,7 +1,9 @@
use testing_framework_core::scenario::ScenarioBuilder;
use testing_framework_core::scenario::{Scenario, ScenarioBuilder};
use testing_framework_workflows::ScenarioBuilderExt;
pub fn wallets_plan() -> testing_framework_core::scenario::Scenario<()> {
use crate::SnippetResult;
pub fn wallets_plan() -> SnippetResult<Scenario<()>> {
ScenarioBuilder::topology_with(|t| t.network_star().validators(1).executors(0))
.wallets(50) // Seed 50 funded wallet accounts
.build()

View File

@ -3,7 +3,10 @@ use std::time::Duration;
use testing_framework_core::scenario::{NodeControlCapability, ScenarioBuilder};
use testing_framework_workflows::{ChaosBuilderExt, ScenarioBuilderExt};
pub fn chaos_plan() -> testing_framework_core::scenario::Scenario<NodeControlCapability> {
use crate::SnippetResult;
pub fn chaos_plan()
-> SnippetResult<testing_framework_core::scenario::Scenario<NodeControlCapability>> {
ScenarioBuilder::topology_with(|t| t.network_star().validators(3).executors(2))
.enable_node_control() // Enable node control capability
.chaos_with(|c| {

View File

@ -1,7 +1,9 @@
use testing_framework_core::scenario::ScenarioBuilder;
use testing_framework_core::scenario::{Scenario, ScenarioBuilder};
use testing_framework_workflows::ScenarioBuilderExt;
pub fn da_plan() -> testing_framework_core::scenario::Scenario<()> {
use crate::SnippetResult;
pub fn da_plan() -> SnippetResult<Scenario<()>> {
ScenarioBuilder::topology_with(|t| t.network_star().validators(1).executors(1))
.wallets(50)
.da_with(|da| {

View File

@ -6,7 +6,7 @@ use testing_framework_workflows::ScenarioBuilderExt;
pub async fn execution() -> Result<()> {
let mut plan = ScenarioBuilder::topology_with(|t| t.network_star().validators(1).executors(0))
.expect_consensus_liveness()
.build();
.build()?;
let deployer = LocalDeployer::default();
let runner = deployer.deploy(&plan).await?;

View File

@ -19,7 +19,7 @@ pub async fn aggressive_chaos_test() -> Result<()> {
})
.expect_consensus_liveness()
.with_run_duration(Duration::from_secs(180))
.build();
.build()?;
let deployer = ComposeDeployer::default();
let runner = deployer.deploy(&plan).await?;

View File

@ -15,7 +15,7 @@ pub async fn load_progression_test() -> Result<()> {
.transactions_with(|txs| txs.rate(rate).users(20))
.expect_consensus_liveness()
.with_run_duration(Duration::from_secs(60))
.build();
.build()?;
let deployer = ComposeDeployer::default();
let runner = deployer.deploy(&plan).await?;

View File

@ -12,7 +12,7 @@ pub async fn sustained_load_test() -> Result<()> {
.da_with(|da| da.channel_rate(2).blob_rate(3))
.expect_consensus_liveness()
.with_run_duration(Duration::from_secs(300))
.build();
.build()?;
let deployer = ComposeDeployer::default();
let runner = deployer.deploy(&plan).await?;

View File

@ -19,7 +19,7 @@ pub async fn chaos_resilience() -> Result<()> {
})
.expect_consensus_liveness()
.with_run_duration(Duration::from_secs(120))
.build();
.build()?;
let deployer = ComposeDeployer::default();
let runner = deployer.deploy(&plan).await?;

View File

@ -12,7 +12,7 @@ pub async fn da_and_transactions() -> Result<()> {
.da_with(|da| da.channel_rate(2).blob_rate(2))
.expect_consensus_liveness()
.with_run_duration(Duration::from_secs(90))
.build();
.build()?;
let deployer = LocalDeployer::default();
let runner = deployer.deploy(&plan).await?;

View File

@ -9,7 +9,7 @@ pub async fn simple_consensus() -> Result<()> {
let mut plan = ScenarioBuilder::topology_with(|t| t.network_star().validators(3).executors(0))
.expect_consensus_liveness()
.with_run_duration(Duration::from_secs(30))
.build();
.build()?;
let deployer = LocalDeployer::default();
let runner = deployer.deploy(&plan).await?;

View File

@ -11,7 +11,7 @@ pub async fn transaction_workload() -> Result<()> {
.transactions_with(|txs| txs.rate(5).users(10))
.expect_consensus_liveness()
.with_run_duration(Duration::from_secs(60))
.build();
.build()?;
let deployer = LocalDeployer::default();
let runner = deployer.deploy(&plan).await?;

View File

@ -1,5 +1,7 @@
use testing_framework_core::scenario::ScenarioBuilder;
use crate::SnippetResult;
pub trait YourExpectationDslExt: Sized {
fn expect_your_condition(self) -> Self;
}
@ -10,8 +12,9 @@ impl<Caps> YourExpectationDslExt for testing_framework_core::scenario::Builder<C
}
}
pub fn use_in_examples() {
pub fn use_in_examples() -> SnippetResult<()> {
let _plan = ScenarioBuilder::topology_with(|t| t.network_star().validators(3).executors(0))
.expect_your_condition()
.build();
.build()?;
Ok(())
}

View File

@ -1,5 +1,7 @@
use testing_framework_core::scenario::ScenarioBuilder;
use crate::SnippetResult;
pub struct YourWorkloadBuilder;
impl YourWorkloadBuilder {
@ -24,8 +26,9 @@ impl<Caps> YourWorkloadDslExt for testing_framework_core::scenario::Builder<Caps
}
}
pub fn use_in_examples() {
pub fn use_in_examples() -> SnippetResult<()> {
let _plan = ScenarioBuilder::topology_with(|t| t.network_star().validators(3).executors(0))
.your_workload_with(|w| w.some_config())
.build();
.build()?;
Ok(())
}

View File

@ -1,5 +1,7 @@
#![allow(dead_code, unused_imports, unused_variables)]
pub type SnippetResult<T> = Result<T, testing_framework_core::scenario::ScenarioBuildError>;
mod architecture_overview_builder_api;
mod chaos_workloads_random_restart;
mod custom_workload_example_expectation;

View File

@ -6,7 +6,7 @@ pub async fn run_with_env_overrides() -> Result<()> {
// Uses NOMOS_DEMO_* env vars (or legacy *_DEMO_* vars)
let mut plan = ScenarioBuilder::with_node_counts(3, 2)
.with_run_duration(std::time::Duration::from_secs(120))
.build();
.build()?;
let deployer = LocalDeployer::default();
let runner = deployer.deploy(&plan).await?;

View File

@ -20,7 +20,7 @@ pub async fn run_local_demo() -> Result<()> {
})
.expect_consensus_liveness()
.with_run_duration(Duration::from_secs(60))
.build();
.build()?;
// Deploy and run
let deployer = LocalDeployer::default();

View File

@ -3,7 +3,7 @@ use testing_framework_core::scenario::{Deployer, ScenarioBuilder};
use testing_framework_runner_local::LocalDeployer;
pub async fn step_6_deploy_and_execute() -> Result<()> {
let mut plan = ScenarioBuilder::with_node_counts(1, 1).build();
let mut plan = ScenarioBuilder::with_node_counts(1, 1).build()?;
let deployer = LocalDeployer::default(); // Use local process deployer
let runner = deployer.deploy(&plan).await?; // Provision infrastructure

View File

@ -4,7 +4,7 @@ use testing_framework_runner_compose::ComposeDeployer;
pub async fn run_with_compose_deployer() -> Result<()> {
// ... same scenario definition ...
let mut plan = ScenarioBuilder::with_node_counts(1, 1).build();
let mut plan = ScenarioBuilder::with_node_counts(1, 1).build()?;
let deployer = ComposeDeployer::default(); // Use Docker Compose
let runner = deployer.deploy(&plan).await?;

View File

@ -1,16 +1,20 @@
use testing_framework_core::scenario::ScenarioBuilder;
use testing_framework_workflows::ScenarioBuilderExt;
pub fn declarative_over_imperative() {
use crate::SnippetResult;
pub fn declarative_over_imperative() -> SnippetResult<()> {
// Good: declarative
let _plan = ScenarioBuilder::topology_with(|t| t.network_star().validators(2).executors(1))
.transactions_with(|txs| {
txs.rate(5) // 5 transactions per block
})
.expect_consensus_liveness()
.build();
.build()?;
// Bad: imperative (framework doesn't work this way)
// spawn_validator(); spawn_executor();
// loop { submit_tx(); check_block(); }
Ok(())
}

View File

@ -3,14 +3,16 @@ use std::time::Duration;
use testing_framework_core::scenario::ScenarioBuilder;
use testing_framework_workflows::{ChaosBuilderExt, ScenarioBuilderExt};
pub fn determinism_first() {
use crate::SnippetResult;
pub fn determinism_first() -> SnippetResult<()> {
// Separate: functional test (deterministic)
let _plan = ScenarioBuilder::topology_with(|t| t.network_star().validators(2).executors(1))
.transactions_with(|txs| {
txs.rate(5) // 5 transactions per block
})
.expect_consensus_liveness()
.build();
.build()?;
// Separate: chaos test (introduces randomness)
let _chaos_plan =
@ -27,5 +29,6 @@ pub fn determinism_first() {
txs.rate(5) // 5 transactions per block
})
.expect_consensus_liveness()
.build();
.build()?;
Ok(())
}

View File

@ -3,17 +3,20 @@ use std::time::Duration;
use testing_framework_core::scenario::ScenarioBuilder;
use testing_framework_workflows::ScenarioBuilderExt;
pub fn minimum_run_windows() {
use crate::SnippetResult;
pub fn minimum_run_windows() -> SnippetResult<()> {
// Bad: too short (~2 blocks with default 2s slots, 0.9 coeff)
let _too_short = ScenarioBuilder::with_node_counts(1, 0)
.with_run_duration(Duration::from_secs(5))
.expect_consensus_liveness()
.build();
.build()?;
// Good: enough blocks for assertions (~27 blocks with default 2s slots, 0.9
// coeff)
let _good = ScenarioBuilder::with_node_counts(1, 0)
.with_run_duration(Duration::from_secs(60))
.expect_consensus_liveness()
.build();
.build()?;
Ok(())
}

View File

@ -3,7 +3,9 @@ use std::time::Duration;
use testing_framework_core::scenario::ScenarioBuilder;
use testing_framework_workflows::ScenarioBuilderExt;
pub fn protocol_time_not_wall_time() {
use crate::SnippetResult;
pub fn protocol_time_not_wall_time() -> SnippetResult<()> {
// Good: protocol-oriented thinking
let _plan = ScenarioBuilder::topology_with(|t| t.network_star().validators(2).executors(1))
.transactions_with(|txs| {
@ -11,9 +13,11 @@ pub fn protocol_time_not_wall_time() {
})
.with_run_duration(Duration::from_secs(60)) // Let framework calculate expected blocks
.expect_consensus_liveness() // "Did we produce the expected blocks?"
.build();
.build()?;
// Bad: wall-clock assumptions
// "I expect exactly 30 blocks in 60 seconds"
// This breaks on slow CI where slot timing might drift
Ok(())
}

View File

@ -82,7 +82,7 @@ async fn run_compose_case(
.da_with(|da| da.channel_rate(DA_CHANNEL_RATE).blob_rate(DA_BLOB_RATE))
.with_run_duration(run_duration)
.expect_consensus_liveness()
.build();
.build()?;
let deployer = ComposeDeployer::new();
info!("deploying compose stack");

View File

@ -59,7 +59,7 @@ async fn run_k8s_case(validators: usize, executors: usize, run_duration: Duratio
}
}
let mut plan = scenario.build();
let mut plan = scenario.build()?;
let deployer = K8sDeployer::new();
info!("deploying k8s stack");

View File

@ -61,7 +61,7 @@ async fn run_local_case(validators: usize, executors: usize, run_duration: Durat
.expect_consensus_liveness()
};
let mut plan = scenario.build();
let mut plan = scenario.build()?;
let deployer = LocalDeployer::default().with_membership_check(true);
info!("deploying local nodes");

View File

@ -1,5 +1,6 @@
use std::{num::NonZeroUsize, sync::Arc, time::Duration};
use thiserror::Error;
use tracing::{debug, info};
use super::{
@ -7,7 +8,7 @@ use super::{
workload::Workload,
};
use crate::topology::{
config::{TopologyBuilder, TopologyConfig},
config::{TopologyBuildError, TopologyBuilder, TopologyConfig},
configs::{network::Libp2pNetworkLayout, wallet::WalletConfig},
generation::GeneratedTopology,
};
@ -16,6 +17,12 @@ const DEFAULT_FUNDS_PER_WALLET: u64 = 100;
const MIN_EXPECTATION_BLOCKS: u32 = 2;
const MIN_EXPECTATION_FALLBACK_SECS: u64 = 10;
#[derive(Debug, Error)]
pub enum ScenarioBuildError {
#[error(transparent)]
Topology(#[from] TopologyBuildError),
}
/// Immutable scenario definition shared between the runner, workloads, and
/// expectations.
pub struct Scenario<Caps = ()> {
@ -214,7 +221,7 @@ impl<Caps> Builder<Caps> {
#[must_use]
/// Finalize the scenario, computing run metrics and initializing
/// components.
pub fn build(self) -> Scenario<Caps> {
pub fn build(self) -> Result<Scenario<Caps>, ScenarioBuildError> {
let Self {
topology,
mut workloads,
@ -224,7 +231,7 @@ impl<Caps> Builder<Caps> {
..
} = self;
let generated = topology.build();
let generated = topology.build()?;
let duration = enforce_min_duration(&generated, duration);
let run_metrics = RunMetrics::from_topology(&generated, duration);
initialize_components(&generated, &run_metrics, &mut workloads, &mut expectations);
@ -238,7 +245,13 @@ impl<Caps> Builder<Caps> {
"scenario built"
);
Scenario::new(generated, workloads, expectations, duration, capabilities)
Ok(Scenario::new(
generated,
workloads,
expectations,
duration,
capabilities,
))
}
}

View File

@ -14,7 +14,9 @@ pub type DynError = Box<dyn std::error::Error + Send + Sync + 'static>;
pub use capabilities::{
NodeControlCapability, NodeControlHandle, ObservabilityCapability, RequiresNodeControl,
};
pub use definition::{Builder, Scenario, ScenarioBuilder, TopologyConfigurator};
pub use definition::{
Builder, Scenario, ScenarioBuildError, ScenarioBuilder, TopologyConfigurator,
};
pub use expectation::Expectation;
pub use observability::{ObservabilityCapabilityProvider, ObservabilityInputs};
pub use runtime::{

View File

@ -7,26 +7,54 @@ use nomos_core::{
use nomos_da_network_core::swarm::DAConnectionPolicySettings;
use testing_framework_config::topology::{
configs::{
api::create_api_configs,
base::{BaseConfigs, build_base_configs},
consensus::{ConsensusParams, ProviderInfo, create_genesis_tx_with_declarations},
api::{ApiConfigError, create_api_configs},
base::{BaseConfigError, BaseConfigs, build_base_configs},
consensus::{
ConsensusConfigError, ConsensusParams, ProviderInfo,
create_genesis_tx_with_declarations,
},
da::DaParams,
network::{Libp2pNetworkLayout, NetworkParams},
tracing::create_tracing_configs,
wallet::WalletConfig,
},
invariants::validate_generated_vectors,
invariants::{TopologyInvariantError, validate_generated_vectors},
};
use thiserror::Error;
use crate::topology::{
configs::{GeneralConfig, time::default_time_config},
generation::{GeneratedNodeConfig, GeneratedTopology, NodeRole},
utils::{create_kms_configs, resolve_ids, resolve_ports},
utils::{TopologyResolveError, create_kms_configs, resolve_ids, resolve_ports},
};
const DEFAULT_DA_BALANCER_INTERVAL: Duration = Duration::from_secs(1);
const VALIDATOR_EXECUTOR_DA_BALANCER_INTERVAL: Duration = Duration::from_secs(5);
#[derive(Debug, Error)]
pub enum TopologyBuildError {
#[error("topology must include at least one node")]
EmptyParticipants,
#[error(transparent)]
Invariants(#[from] TopologyInvariantError),
#[error(transparent)]
Resolve(#[from] TopologyResolveError),
#[error(transparent)]
Base(#[from] BaseConfigError),
#[error(transparent)]
Api(#[from] ApiConfigError),
#[error(transparent)]
Genesis(#[from] ConsensusConfigError),
#[error("config generation requires at least one consensus config")]
MissingConsensusConfig,
#[error("internal config vector mismatch for {label} (expected {expected}, got {actual})")]
VectorLenMismatch {
label: &'static str,
expected: usize,
actual: usize,
},
}
/// High-level topology settings used to generate node configs for a scenario.
#[derive(Clone)]
pub struct TopologyConfig {
@ -96,7 +124,6 @@ impl TopologyConfig {
/// Build a topology with explicit validator and executor counts.
pub fn with_node_numbers(validators: usize, executors: usize) -> Self {
let participants = validators + executors;
assert!(participants > 0, "topology must include at least one node");
let mut da_params = DaParams::default();
let da_nodes = participants;
@ -240,9 +267,8 @@ impl TopologyBuilder {
self
}
#[must_use]
/// Finalize and generate topology and node descriptors.
pub fn build(self) -> GeneratedTopology {
pub fn build(self) -> Result<GeneratedTopology, TopologyBuildError> {
let Self {
config,
ids,
@ -251,14 +277,15 @@ impl TopologyBuilder {
} = self;
let n_participants = config.n_validators + config.n_executors;
assert!(n_participants > 0, "topology must have at least one node");
if n_participants == 0 {
return Err(TopologyBuildError::EmptyParticipants);
}
let ids = resolve_ids(ids, n_participants);
let da_ports = resolve_ports(da_ports, n_participants, "DA");
let blend_ports = resolve_ports(blend_ports, n_participants, "Blend");
let ids = resolve_ids(ids, n_participants)?;
let da_ports = resolve_ports(da_ports, n_participants, "DA")?;
let blend_ports = resolve_ports(blend_ports, n_participants, "Blend")?;
validate_generated_vectors(n_participants, &ids, &da_ports, &blend_ports)
.expect("invalid generated topology inputs");
validate_generated_vectors(n_participants, &ids, &da_ports, &blend_ports)?;
let BaseConfigs {
mut consensus_configs,
@ -274,50 +301,56 @@ impl TopologyBuilder {
&config.wallet_config,
&da_ports,
&blend_ports,
)
.expect("failed to build base configs");
let api_configs = create_api_configs(&ids).expect("failed to create API configs");
)?;
let api_configs = create_api_configs(&ids)?;
let tracing_configs = create_tracing_configs(&ids);
let time_config = default_time_config();
let mut providers: Vec<_> = da_configs
.iter()
.enumerate()
.map(|(i, da_conf)| ProviderInfo {
let first_consensus = consensus_configs
.first()
.ok_or(TopologyBuildError::MissingConsensusConfig)?;
let mut providers = Vec::with_capacity(da_configs.len() + blend_configs.len());
for (i, da_conf) in da_configs.iter().enumerate() {
let note = first_consensus
.da_notes
.get(i)
.ok_or(TopologyBuildError::VectorLenMismatch {
label: "da_notes",
expected: da_configs.len(),
actual: first_consensus.da_notes.len(),
})?
.clone();
providers.push(ProviderInfo {
service_type: ServiceType::DataAvailability,
provider_sk: da_conf.signer.clone(),
zk_sk: da_conf.secret_zk_key.clone(),
locator: Locator(da_conf.listening_address.clone()),
note: consensus_configs[0].da_notes[i].clone(),
})
.collect();
providers.extend(
blend_configs
.iter()
.enumerate()
.map(|(i, blend_conf)| ProviderInfo {
service_type: ServiceType::BlendNetwork,
provider_sk: blend_conf.signer.clone(),
zk_sk: blend_conf.secret_zk_key.clone(),
locator: Locator(blend_conf.backend_core.listening_address.clone()),
note: consensus_configs[0].blend_notes[i].clone(),
}),
);
note,
});
}
for (i, blend_conf) in blend_configs.iter().enumerate() {
let note = first_consensus
.blend_notes
.get(i)
.ok_or(TopologyBuildError::VectorLenMismatch {
label: "blend_notes",
expected: blend_configs.len(),
actual: first_consensus.blend_notes.len(),
})?
.clone();
providers.push(ProviderInfo {
service_type: ServiceType::BlendNetwork,
provider_sk: blend_conf.signer.clone(),
zk_sk: blend_conf.secret_zk_key.clone(),
locator: Locator(blend_conf.backend_core.listening_address.clone()),
note,
});
}
let ledger_tx = consensus_configs[0]
.genesis_tx
.mantle_tx()
.ledger_tx
.clone();
match create_genesis_tx_with_declarations(ledger_tx, providers) {
Ok(genesis_tx) => {
for c in &mut consensus_configs {
c.genesis_tx = genesis_tx.clone();
}
}
Err(err) => {
tracing::error!(error = ?err, "failed to build genesis declarations; using base genesis transaction");
}
let ledger_tx = first_consensus.genesis_tx.mantle_tx().ledger_tx.clone();
let genesis_tx = create_genesis_tx_with_declarations(ledger_tx, providers)?;
for c in &mut consensus_configs {
c.genesis_tx = genesis_tx.clone();
}
let kms_configs =
@ -327,16 +360,100 @@ impl TopologyBuilder {
let mut executors = Vec::with_capacity(config.n_executors);
for i in 0..n_participants {
let consensus_config = consensus_configs
.get(i)
.ok_or(TopologyBuildError::VectorLenMismatch {
label: "consensus_configs",
expected: n_participants,
actual: consensus_configs.len(),
})?
.clone();
let bootstrapping_config = bootstrapping_config
.get(i)
.ok_or(TopologyBuildError::VectorLenMismatch {
label: "bootstrap_configs",
expected: n_participants,
actual: bootstrapping_config.len(),
})?
.clone();
let da_config = da_configs
.get(i)
.ok_or(TopologyBuildError::VectorLenMismatch {
label: "da_configs",
expected: n_participants,
actual: da_configs.len(),
})?
.clone();
let network_config = network_configs
.get(i)
.ok_or(TopologyBuildError::VectorLenMismatch {
label: "network_configs",
expected: n_participants,
actual: network_configs.len(),
})?
.clone();
let blend_config = blend_configs
.get(i)
.ok_or(TopologyBuildError::VectorLenMismatch {
label: "blend_configs",
expected: n_participants,
actual: blend_configs.len(),
})?
.clone();
let api_config = api_configs
.get(i)
.ok_or(TopologyBuildError::VectorLenMismatch {
label: "api_configs",
expected: n_participants,
actual: api_configs.len(),
})?
.clone();
let tracing_config = tracing_configs
.get(i)
.ok_or(TopologyBuildError::VectorLenMismatch {
label: "tracing_configs",
expected: n_participants,
actual: tracing_configs.len(),
})?
.clone();
let kms_config = kms_configs
.get(i)
.ok_or(TopologyBuildError::VectorLenMismatch {
label: "kms_configs",
expected: n_participants,
actual: kms_configs.len(),
})?
.clone();
let id = *ids.get(i).ok_or(TopologyBuildError::VectorLenMismatch {
label: "ids",
expected: n_participants,
actual: ids.len(),
})?;
let da_port = *da_ports
.get(i)
.ok_or(TopologyBuildError::VectorLenMismatch {
label: "da_ports",
expected: n_participants,
actual: da_ports.len(),
})?;
let blend_port = *blend_ports
.get(i)
.ok_or(TopologyBuildError::VectorLenMismatch {
label: "blend_ports",
expected: n_participants,
actual: blend_ports.len(),
})?;
let general = GeneralConfig {
consensus_config: consensus_configs[i].clone(),
bootstrapping_config: bootstrapping_config[i].clone(),
da_config: da_configs[i].clone(),
network_config: network_configs[i].clone(),
blend_config: blend_configs[i].clone(),
api_config: api_configs[i].clone(),
tracing_config: tracing_configs[i].clone(),
consensus_config,
bootstrapping_config,
da_config,
network_config,
blend_config,
api_config,
tracing_config,
time_config: time_config.clone(),
kms_config: kms_configs[i].clone(),
kms_config,
};
let role = if i < config.n_validators {
@ -352,10 +469,10 @@ impl TopologyBuilder {
let descriptor = GeneratedNodeConfig {
role,
index,
id: ids[i],
id,
general,
da_port: da_ports[i],
blend_port: blend_ports[i],
da_port,
blend_port,
};
match role {
@ -364,11 +481,11 @@ impl TopologyBuilder {
}
}
GeneratedTopology {
Ok(GeneratedTopology {
config,
validators,
executors,
}
})
}
#[must_use]

View File

@ -10,7 +10,7 @@ use crate::{
validator::{Validator, create_validator_config},
},
topology::{
config::{TopologyBuilder, TopologyConfig},
config::{TopologyBuildError, TopologyBuilder, TopologyConfig},
configs::GeneralConfig,
generation::find_expected_peer_counts,
readiness::{
@ -31,13 +31,15 @@ pub type DeployedNodes = (Vec<Validator>, Vec<Executor>);
#[derive(Debug, Error)]
pub enum SpawnTopologyError {
#[error(transparent)]
Build(#[from] TopologyBuildError),
#[error(transparent)]
Node(#[from] SpawnNodeError),
}
impl Topology {
pub async fn spawn(config: TopologyConfig) -> Result<Self, SpawnTopologyError> {
let generated = TopologyBuilder::new(config.clone()).build();
let generated = TopologyBuilder::new(config.clone()).build()?;
let n_validators = config.n_validators;
let n_executors = config.n_executors;
let node_configs = generated
@ -64,7 +66,7 @@ impl Topology {
.with_ids(ids.to_vec())
.with_da_ports(da_ports.to_vec())
.with_blend_ports(blend_ports.to_vec())
.build();
.build()?;
let node_configs = generated
.nodes()

View File

@ -4,6 +4,7 @@ use groth16::fr_to_bytes;
use key_management_system_service::{backend::preload::PreloadKMSBackendSettings, keys::Key};
use nomos_utils::net::get_available_udp_port;
use rand::{Rng, thread_rng};
use thiserror::Error;
use crate::topology::configs::{
blend::GeneralBlendConfig, da::GeneralDaConfig, wallet::WalletAccount,
@ -52,40 +53,67 @@ pub fn create_kms_configs(
.collect()
}
pub fn resolve_ids(ids: Option<Vec<[u8; 32]>>, count: usize) -> Vec<[u8; 32]> {
ids.map_or_else(
|| {
#[derive(Debug, Error)]
pub enum TopologyResolveError {
#[error("expected {expected} ids but got {actual}")]
IdCountMismatch { expected: usize, actual: usize },
#[error("expected {expected} {label} ports but got {actual}")]
PortCountMismatch {
label: &'static str,
expected: usize,
actual: usize,
},
#[error("failed to allocate a free UDP port for {label}")]
PortAllocationFailed { label: &'static str },
}
pub fn resolve_ids(
ids: Option<Vec<[u8; 32]>>,
count: usize,
) -> Result<Vec<[u8; 32]>, TopologyResolveError> {
match ids {
Some(ids) => {
if ids.len() != count {
return Err(TopologyResolveError::IdCountMismatch {
expected: count,
actual: ids.len(),
});
}
Ok(ids)
}
None => {
let mut generated = vec![[0; 32]; count];
for id in &mut generated {
thread_rng().fill(id);
}
generated
},
|ids| {
assert_eq!(
ids.len(),
count,
"expected {count} ids but got {}",
ids.len()
);
ids
},
)
Ok(generated)
}
}
}
pub fn resolve_ports(ports: Option<Vec<u16>>, count: usize, label: &str) -> Vec<u16> {
let resolved = ports.unwrap_or_else(|| {
iter::repeat_with(|| get_available_udp_port().unwrap())
.take(count)
.collect()
});
assert_eq!(
resolved.len(),
count,
"expected {count} {label} ports but got {}",
resolved.len()
);
resolved
pub fn resolve_ports(
ports: Option<Vec<u16>>,
count: usize,
label: &'static str,
) -> Result<Vec<u16>, TopologyResolveError> {
let resolved = match ports {
Some(ports) => ports,
None => iter::repeat_with(|| {
get_available_udp_port().ok_or(TopologyResolveError::PortAllocationFailed { label })
})
.take(count)
.collect::<Result<Vec<_>, _>>()?,
};
if resolved.len() != count {
return Err(TopologyResolveError::PortCountMismatch {
label,
expected: count,
actual: resolved.len(),
});
}
Ok(resolved)
}
pub fn multiaddr_port(addr: &nomos_libp2p::Multiaddr) -> Option<u16> {

View File

@ -1,7 +1,9 @@
use std::{env, path::PathBuf, time::Duration};
use cucumber::World;
use testing_framework_core::scenario::{Builder, NodeControlCapability, Scenario, ScenarioBuilder};
use testing_framework_core::scenario::{
Builder, NodeControlCapability, Scenario, ScenarioBuildError, ScenarioBuilder,
};
use testing_framework_workflows::{ScenarioBuilderExt as _, expectations::ConsensusLiveness};
use thiserror::Error;
@ -81,6 +83,11 @@ pub enum StepError {
InvalidArgument { message: String },
#[error("{message}")]
Preflight { message: String },
#[error("failed to build scenario: {source}")]
ScenarioBuild {
#[source]
source: ScenarioBuildError,
},
#[error("{message}")]
RunFailed { message: String },
}
@ -195,14 +202,18 @@ impl TestingFrameworkWorld {
pub fn build_local_scenario(&self) -> Result<Scenario<()>, StepError> {
self.preflight(DeployerKind::Local)?;
let builder = self.make_builder_for_deployer::<()>(DeployerKind::Local)?;
Ok(builder.build())
builder
.build()
.map_err(|source| StepError::ScenarioBuild { source })
}
pub fn build_compose_scenario(&self) -> Result<Scenario<NodeControlCapability>, StepError> {
self.preflight(DeployerKind::Compose)?;
let builder =
self.make_builder_for_deployer::<NodeControlCapability>(DeployerKind::Compose)?;
Ok(builder.build())
builder
.build()
.map_err(|source| StepError::ScenarioBuild { source })
}
pub fn preflight(&self, expected: DeployerKind) -> Result<(), StepError> {

View File

@ -109,7 +109,9 @@ mod tests {
#[test]
fn cfgsync_prebuilt_configs_preserve_genesis() {
let scenario = ScenarioBuilder::topology_with(|t| t.validators(1).executors(1)).build();
let scenario = ScenarioBuilder::topology_with(|t| t.validators(1).executors(1))
.build()
.expect("scenario build should succeed");
let topology = scenario.topology().clone();
let hosts = hosts_from_topology(&topology);
let tracing_settings = tracing_settings(&topology);
@ -161,7 +163,9 @@ mod tests {
#[test]
fn cfgsync_genesis_proofs_verify_against_ledger() {
let scenario = ScenarioBuilder::topology_with(|t| t.validators(1).executors(1)).build();
let scenario = ScenarioBuilder::topology_with(|t| t.validators(1).executors(1))
.build()
.expect("scenario build should succeed");
let topology = scenario.topology().clone();
let hosts = hosts_from_topology(&topology);
let tracing_settings = tracing_settings(&topology);
@ -197,7 +201,9 @@ mod tests {
#[test]
fn cfgsync_docker_overrides_produce_valid_genesis() {
let scenario = ScenarioBuilder::topology_with(|t| t.validators(1).executors(1)).build();
let scenario = ScenarioBuilder::topology_with(|t| t.validators(1).executors(1))
.build()
.expect("scenario build should succeed");
let topology = scenario.topology().clone();
let tracing_settings = tracing_settings(&topology);
let hosts = docker_style_hosts(&topology);
@ -228,7 +234,9 @@ mod tests {
#[test]
fn cfgsync_configs_match_topology_ports_and_genesis() {
let scenario = ScenarioBuilder::topology_with(|t| t.validators(1).executors(1)).build();
let scenario = ScenarioBuilder::topology_with(|t| t.validators(1).executors(1))
.build()
.expect("scenario build should succeed");
let topology = scenario.topology().clone();
let hosts = hosts_from_topology(&topology);
let tracing_settings = tracing_settings(&topology);