Add expectation fail-fast during capture with runner integration

This commit is contained in:
andrussal 2026-02-21 15:28:08 +01:00
parent 406d2bc63f
commit 6910314298
5 changed files with 218 additions and 57 deletions

View File

@ -0,0 +1,65 @@
use std::time::Duration;
use anyhow::{Result, anyhow};
use async_trait::async_trait;
use lb_framework::{CoreBuilderExt as _, LbcEnv, LbcLocalDeployer, ScenarioBuilder};
use testing_framework_core::scenario::{
Deployer, DynError, Expectation, RunContext, ScenarioError,
};
use tracing_subscriber::fmt::try_init;
const FAIL_AFTER_CHECKS: usize = 2;
struct FailFastDuringCaptureExpectation {
checks: usize,
}
impl Default for FailFastDuringCaptureExpectation {
fn default() -> Self {
Self { checks: 0 }
}
}
#[async_trait]
impl Expectation<LbcEnv> for FailFastDuringCaptureExpectation {
fn name(&self) -> &str {
"fail_fast_during_capture"
}
async fn check_during_capture(&mut self, _ctx: &RunContext<LbcEnv>) -> Result<(), DynError> {
self.checks += 1;
if self.checks >= FAIL_AFTER_CHECKS {
return Err(format!(
"intentional fail-fast trigger after {} capture checks",
self.checks
)
.into());
}
Ok(())
}
async fn evaluate(&mut self, _ctx: &RunContext<LbcEnv>) -> Result<(), DynError> {
Ok(())
}
}
#[tokio::test]
#[ignore = "requires local node binary and open ports"]
async fn expectation_can_fail_fast_during_capture() -> Result<()> {
let _ = try_init();
let mut scenario = ScenarioBuilder::deployment_with(|topology| topology.with_node_count(1))
.with_run_duration(Duration::from_secs(30))
.with_expectation(FailFastDuringCaptureExpectation::default())
.build()?;
let deployer = LbcLocalDeployer::default();
let runner = deployer.deploy(&scenario).await?;
match runner.run(&mut scenario).await {
Err(ScenarioError::ExpectationFailedDuringCapture(_)) => Ok(()),
Err(other) => Err(anyhow!("unexpected scenario error: {other}")),
Ok(_) => Err(anyhow!("expected fail-fast capture error, run succeeded")),
}
}

View File

@ -19,5 +19,12 @@ pub trait Expectation<E: Application>: Send + Sync {
Ok(())
}
/// Optional periodic check used by fail-fast expectation mode.
///
/// Default is a no-op so existing expectations stay end-of-run only.
async fn check_during_capture(&mut self, _ctx: &RunContext<E>) -> Result<(), DynError> {
Ok(())
}
async fn evaluate(&mut self, ctx: &RunContext<E>) -> Result<(), DynError>;
}

View File

@ -10,6 +10,8 @@ pub enum ScenarioError {
Workload(#[source] DynError),
#[error("expectation capture failed: {0}")]
ExpectationCapture(#[source] DynError),
#[error("expectation failed during capture: {0}")]
ExpectationFailedDuringCapture(#[source] DynError),
#[error("expectations failed:\n{0}")]
Expectations(#[source] DynError),
}

View File

@ -1,14 +1,15 @@
use std::{any::Any, future::Future, panic::AssertUnwindSafe, sync::Arc, time::Duration};
use futures::{FutureExt as _, future};
use futures::FutureExt as _;
use tokio::{
task::{JoinError, JoinSet},
time::{sleep, timeout},
time::{Interval, interval, sleep},
};
use tracing::{debug, info, warn};
use super::deployer::ScenarioError;
use crate::scenario::{
Application, DynError, Expectation, Scenario,
Application, DynError, Expectation, Scenario, Workload,
runtime::context::{CleanupGuard, RunContext, RunHandle},
};
@ -17,6 +18,7 @@ type WorkloadOutcome = Result<(), DynError>;
const MIN_NODE_CONTROL_COOLDOWN: Duration = Duration::from_secs(30);
const DEFAULT_BLOCK_FEED_SETTLE_WAIT: Duration = Duration::from_secs(1);
const MIN_BLOCK_FEED_SETTLE_WAIT: Duration = Duration::from_secs(2);
const EXPECTATION_CAPTURE_CHECK_INTERVAL: Duration = Duration::from_secs(1);
const UNKNOWN_PANIC: &str = "<unknown panic>";
/// Represents a fully prepared environment capable of executing a scenario.
@ -60,6 +62,16 @@ impl<E: Application> Runner<E> {
Caps: Send + Sync,
{
let context = self.context();
let run_duration = scenario.duration();
let workloads = scenario.workloads().to_vec();
let expectation_count = scenario.expectations().len();
info!(
run_secs = run_duration.as_secs(),
workloads = workloads.len(),
expectations = expectation_count,
"runner starting scenario execution"
);
self.run_step(Self::prepare_expectations(
scenario.expectations_mut(),
@ -67,8 +79,13 @@ impl<E: Application> Runner<E> {
))
.await?;
self.run_step(Self::run_workloads(Arc::clone(&context), scenario))
.await?;
self.run_step(Self::run_workload_phase(
Arc::clone(&context),
&workloads,
run_duration,
scenario.expectations_mut(),
))
.await?;
Self::settle_before_expectations(context.as_ref()).await;
@ -78,6 +95,8 @@ impl<E: Application> Runner<E> {
))
.await?;
info!("runner finished scenario execution");
Ok(self.into_run_handle())
}
@ -108,32 +127,46 @@ impl<E: Application> Runner<E> {
Ok(())
}
async fn run_workloads<Caps>(
async fn run_workload_phase(
context: Arc<RunContext<E>>,
scenario: &Scenario<E, Caps>,
) -> Result<(), ScenarioError>
where
Caps: Send + Sync,
{
if scenario.workloads().is_empty() {
return idle_until_duration(scenario.duration()).await;
workloads: &[Arc<dyn Workload<E>>],
duration: Duration,
expectations: &mut [Box<dyn Expectation<E>>],
) -> Result<(), ScenarioError> {
info!(
workloads = workloads.len(),
run_secs = duration.as_secs(),
"runner workload phase started"
);
if workloads.is_empty() {
Self::run_idle_window_with_capture_checks(duration, expectations, context.as_ref())
.await?;
info!("runner workload phase completed (idle)");
return Ok(());
}
let mut workloads = Self::spawn_workloads(scenario, Arc::clone(&context));
Self::run_workload_window(&mut workloads, scenario.duration()).await?;
let mut running = Self::spawn_workloads(workloads, Arc::clone(&context));
Self::run_window_until_timeout(&mut running, duration, expectations, context.as_ref())
.await?;
if let Some(cooldown) = nonzero_cooldown(Self::cooldown_duration(context.as_ref())) {
Self::run_workload_window(&mut workloads, cooldown).await?;
info!(
cooldown_secs = cooldown.as_secs(),
"runner cooldown window started"
);
Self::run_window_until_timeout(&mut running, cooldown, expectations, context.as_ref())
.await?;
}
Self::drain_workloads(&mut workloads).await
}
Self::drain_workloads(&mut running).await?;
info!("runner workload phase completed");
async fn run_workload_window(
workloads: &mut JoinSet<WorkloadOutcome>,
duration: Duration,
) -> Result<(), ScenarioError> {
let _completed = Self::drive_until_timer(workloads, duration).await?;
Ok(())
}
@ -203,19 +236,16 @@ impl<E: Application> Runner<E> {
}
/// Spawn each workload in its own task.
fn spawn_workloads<Caps>(
scenario: &Scenario<E, Caps>,
fn spawn_workloads(
workloads: &[Arc<dyn Workload<E>>],
context: Arc<RunContext<E>>,
) -> JoinSet<WorkloadOutcome>
where
Caps: Send + Sync,
{
let mut workloads = JoinSet::new();
for workload in scenario.workloads() {
) -> JoinSet<WorkloadOutcome> {
let mut running = JoinSet::new();
for workload in workloads {
let workload = Arc::clone(workload);
let ctx = Arc::clone(&context);
workloads.spawn(async move {
running.spawn(async move {
// Convert panics into workload errors so the runner can report
// them instead of aborting the process.
let outcome = AssertUnwindSafe(async { workload.start(ctx.as_ref()).await })
@ -228,29 +258,87 @@ impl<E: Application> Runner<E> {
});
}
workloads
running
}
/// Drive workload tasks until timeout or failure.
async fn drive_until_timer(
async fn run_window_until_timeout(
workloads: &mut JoinSet<WorkloadOutcome>,
duration: Duration,
) -> Result<bool, ScenarioError> {
let run_future = async {
while let Some(result) = workloads.join_next().await {
Self::map_join_result(result)?;
expectations: &mut [Box<dyn Expectation<E>>],
context: &RunContext<E>,
) -> Result<(), ScenarioError> {
if duration.is_zero() {
return Ok(());
}
let timer = sleep(duration);
tokio::pin!(timer);
let mut capture_tick = capture_check_interval();
loop {
tokio::select! {
_ = &mut timer => return Ok(()),
_ = capture_tick.tick() => {
Self::run_capture_checks(expectations, context).await?;
}
result = workloads.join_next(), if !workloads.is_empty() => {
let Some(result) = result else {
return Ok(());
};
Self::map_join_result(result)?;
if workloads.is_empty() {
return Ok(());
}
}
}
}
}
Ok(())
};
async fn run_capture_checks(
expectations: &mut [Box<dyn Expectation<E>>],
context: &RunContext<E>,
) -> Result<(), ScenarioError> {
let expectation_count = expectations.len();
match timeout(duration, run_future).await {
Ok(result) => {
result?;
Ok(true)
for expectation in expectations {
if let Err(source) = expectation.check_during_capture(context).await {
warn!(expectation = expectation.name(), %source, "expectation failed during capture");
return Err(capture_check_failure(expectation.name(), source));
}
}
Err(_) => Ok(false),
debug!(
expectations = expectation_count,
"expectation capture check pass"
);
Ok(())
}
async fn run_idle_window_with_capture_checks(
duration: Duration,
expectations: &mut [Box<dyn Expectation<E>>],
context: &RunContext<E>,
) -> Result<(), ScenarioError> {
if duration.is_zero() {
return Ok(());
}
let timer = sleep(duration);
tokio::pin!(timer);
let mut capture_tick = capture_check_interval();
loop {
tokio::select! {
_ = &mut timer => return Ok(()),
_ = capture_tick.tick() => {
Self::run_capture_checks(expectations, context).await?;
}
}
}
}
@ -274,19 +362,14 @@ impl<E: Application> Runner<E> {
}
}
async fn idle_until_duration(duration: Duration) -> Result<(), ScenarioError> {
if duration.is_zero() {
return Ok(());
}
let _ = timeout(duration, async { future::pending::<()>().await }).await;
Ok(())
}
fn nonzero_cooldown(cooldown: Option<Duration>) -> Option<Duration> {
cooldown.filter(|duration| !duration.is_zero())
}
fn capture_check_interval() -> Interval {
interval(EXPECTATION_CAPTURE_CHECK_INTERVAL)
}
fn panic_message(panic: Box<dyn Any + Send>) -> String {
panic
.downcast_ref::<&str>()
@ -295,6 +378,10 @@ fn panic_message(panic: Box<dyn Any + Send>) -> String {
.unwrap_or_else(|| UNKNOWN_PANIC.to_owned())
}
fn capture_check_failure(expectation: &str, source: DynError) -> ScenarioError {
ScenarioError::ExpectationFailedDuringCapture(format!("{expectation}: {source}").into())
}
fn expectation_failure_summary(failures: Vec<(String, DynError)>) -> String {
failures
.into_iter()

View File

@ -135,9 +135,9 @@ impl From<ScenarioError> for ProcessDeployerError {
fn from(value: ScenarioError) -> Self {
match value {
ScenarioError::Workload(source) => Self::WorkloadFailed { source },
ScenarioError::ExpectationCapture(source) | ScenarioError::Expectations(source) => {
Self::ExpectationsFailed { source }
}
ScenarioError::ExpectationCapture(source)
| ScenarioError::ExpectationFailedDuringCapture(source)
| ScenarioError::Expectations(source) => Self::ExpectationsFailed { source },
}
}
}