core: make scenario initialization fallible

This commit is contained in:
andrussal 2025-12-18 22:52:53 +01:00
parent 47b4e8531d
commit ecdbb3a171

View File

@ -4,7 +4,7 @@ use thiserror::Error;
use tracing::{debug, info};
use super::{
NodeControlCapability, expectation::Expectation, runtime::context::RunMetrics,
DynError, NodeControlCapability, expectation::Expectation, runtime::context::RunMetrics,
workload::Workload,
};
use crate::topology::{
@ -21,6 +21,14 @@ const MIN_EXPECTATION_FALLBACK_SECS: u64 = 10;
pub enum ScenarioBuildError {
#[error(transparent)]
Topology(#[from] TopologyBuildError),
#[error("wallet user count must be non-zero (got {users})")]
WalletUsersZero { users: usize },
#[error("wallet funds overflow for {users} users at {per_wallet} per wallet")]
WalletFundsOverflow { users: usize, per_wallet: u64 },
#[error("workload '{name}' failed to initialize")]
WorkloadInit { name: String, source: DynError },
#[error("expectation '{name}' failed to initialize")]
ExpectationInit { name: String, source: DynError },
}
/// Immutable scenario definition shared between the runner, workloads, and
@ -84,9 +92,10 @@ impl<Caps> Scenario<Caps> {
/// Builder used by callers to describe the desired scenario.
pub struct Builder<Caps = ()> {
topology: TopologyBuilder,
workloads: Vec<Arc<dyn Workload>>,
workloads: Vec<Box<dyn Workload>>,
expectations: Vec<Box<dyn Expectation>>,
duration: Duration,
wallet_users: Option<usize>,
capabilities: Caps,
}
@ -109,6 +118,7 @@ impl<Caps: Default> Builder<Caps> {
workloads: Vec::new(),
expectations: Vec::new(),
duration: Duration::ZERO,
wallet_users: None,
capabilities: Caps::default(),
}
}
@ -145,6 +155,7 @@ impl<Caps> Builder<Caps> {
workloads,
expectations,
duration,
wallet_users,
..
} = self;
@ -153,6 +164,7 @@ impl<Caps> Builder<Caps> {
workloads,
expectations,
duration,
wallet_users,
capabilities,
}
}
@ -173,7 +185,7 @@ impl<Caps> Builder<Caps> {
W: Workload + 'static,
{
self.expectations.extend(workload.expectations());
self.workloads.push(Arc::new(workload));
self.workloads.push(Box::new(workload));
self
}
@ -205,17 +217,15 @@ impl<Caps> Builder<Caps> {
/// Override wallet config for the topology.
pub fn with_wallet_config(mut self, wallet: WalletConfig) -> Self {
self.topology = self.topology.with_wallet_config(wallet);
self.wallet_users = None;
self
}
#[must_use]
pub fn wallets(self, users: usize) -> Self {
let user_count = NonZeroUsize::new(users).expect("wallet user count must be non-zero");
let total_funds = DEFAULT_FUNDS_PER_WALLET
.checked_mul(users as u64)
.expect("wallet count exceeds capacity");
let wallet = WalletConfig::uniform(total_funds, user_count);
self.with_wallet_config(wallet)
let mut builder = self;
builder.wallet_users = Some(users);
builder
}
#[must_use]
@ -223,18 +233,34 @@ impl<Caps> Builder<Caps> {
/// components.
pub fn build(self) -> Result<Scenario<Caps>, ScenarioBuildError> {
let Self {
topology,
mut topology,
mut workloads,
mut expectations,
duration,
wallet_users,
capabilities,
..
} = self;
if let Some(users) = wallet_users {
let user_count =
NonZeroUsize::new(users).ok_or(ScenarioBuildError::WalletUsersZero { users })?;
let total_funds = DEFAULT_FUNDS_PER_WALLET.checked_mul(users as u64).ok_or(
ScenarioBuildError::WalletFundsOverflow {
users,
per_wallet: DEFAULT_FUNDS_PER_WALLET,
},
)?;
let wallet = WalletConfig::uniform(total_funds, user_count);
topology = topology.with_wallet_config(wallet);
}
let generated = topology.build()?;
let duration = enforce_min_duration(&generated, duration);
let run_metrics = RunMetrics::from_topology(&generated, duration);
initialize_components(&generated, &run_metrics, &mut workloads, &mut expectations);
initialize_components(&generated, &run_metrics, &mut workloads, &mut expectations)?;
let workloads: Vec<Arc<dyn Workload>> = workloads.into_iter().map(Arc::from).collect();
info!(
validators = generated.validators().len(),
@ -289,12 +315,6 @@ impl<Caps> TopologyConfigurator<Caps> {
/// Finalize and return the underlying scenario builder.
#[must_use]
pub fn apply(self) -> Builder<Caps> {
let participants = self.validators + self.executors;
assert!(
participants > 0,
"topology must include at least one node; call validators()/executors() before apply()"
);
let mut config = TopologyConfig::with_node_numbers(self.validators, self.executors);
if self.network_star {
config.network_params.libp2p_network_layout = Libp2pNetworkLayout::Star;
@ -316,43 +336,46 @@ impl Builder<()> {
fn initialize_components(
descriptors: &GeneratedTopology,
run_metrics: &RunMetrics,
workloads: &mut [Arc<dyn Workload>],
workloads: &mut [Box<dyn Workload>],
expectations: &mut [Box<dyn Expectation>],
) {
initialize_workloads(descriptors, run_metrics, workloads);
initialize_expectations(descriptors, run_metrics, expectations);
) -> Result<(), ScenarioBuildError> {
initialize_workloads(descriptors, run_metrics, workloads)?;
initialize_expectations(descriptors, run_metrics, expectations)?;
Ok(())
}
fn initialize_workloads(
descriptors: &GeneratedTopology,
run_metrics: &RunMetrics,
workloads: &mut [Arc<dyn Workload>],
) {
workloads: &mut [Box<dyn Workload>],
) -> Result<(), ScenarioBuildError> {
for workload in workloads {
let inner =
Arc::get_mut(workload).expect("workload unexpectedly cloned before initialization");
debug!(workload = inner.name(), "initializing workload");
if let Err(err) = inner.init(descriptors, run_metrics) {
panic!("workload '{}' failed to initialize: {err}", inner.name());
}
debug!(workload = workload.name(), "initializing workload");
workload.init(descriptors, run_metrics).map_err(|source| {
ScenarioBuildError::WorkloadInit {
name: workload.name().to_owned(),
source,
}
})?;
}
Ok(())
}
fn initialize_expectations(
descriptors: &GeneratedTopology,
run_metrics: &RunMetrics,
expectations: &mut [Box<dyn Expectation>],
) {
) -> Result<(), ScenarioBuildError> {
for expectation in expectations {
debug!(expectation = expectation.name(), "initializing expectation");
if let Err(err) = expectation.init(descriptors, run_metrics) {
panic!(
"expectation '{}' failed to initialize: {err}",
expectation.name()
);
}
expectation
.init(descriptors, run_metrics)
.map_err(|source| ScenarioBuildError::ExpectationInit {
name: expectation.name().to_owned(),
source,
})?;
}
Ok(())
}
fn enforce_min_duration(descriptors: &GeneratedTopology, requested: Duration) -> Duration {