diff --git a/logos/examples/tests/expectation_fail_fast_capture.rs b/logos/examples/tests/expectation_fail_fast_capture.rs new file mode 100644 index 0000000..0b0be89 --- /dev/null +++ b/logos/examples/tests/expectation_fail_fast_capture.rs @@ -0,0 +1,65 @@ +use std::time::Duration; + +use anyhow::{Result, anyhow}; +use async_trait::async_trait; +use lb_framework::{CoreBuilderExt as _, LbcEnv, LbcLocalDeployer, ScenarioBuilder}; +use testing_framework_core::scenario::{ + Deployer, DynError, Expectation, RunContext, ScenarioError, +}; +use tracing_subscriber::fmt::try_init; + +const FAIL_AFTER_CHECKS: usize = 2; + +struct FailFastDuringCaptureExpectation { + checks: usize, +} + +impl Default for FailFastDuringCaptureExpectation { + fn default() -> Self { + Self { checks: 0 } + } +} + +#[async_trait] +impl Expectation for FailFastDuringCaptureExpectation { + fn name(&self) -> &str { + "fail_fast_during_capture" + } + + async fn check_during_capture(&mut self, _ctx: &RunContext) -> Result<(), DynError> { + self.checks += 1; + if self.checks >= FAIL_AFTER_CHECKS { + return Err(format!( + "intentional fail-fast trigger after {} capture checks", + self.checks + ) + .into()); + } + + Ok(()) + } + + async fn evaluate(&mut self, _ctx: &RunContext) -> Result<(), DynError> { + Ok(()) + } +} + +#[tokio::test] +#[ignore = "requires local node binary and open ports"] +async fn expectation_can_fail_fast_during_capture() -> Result<()> { + let _ = try_init(); + + let mut scenario = ScenarioBuilder::deployment_with(|topology| topology.with_node_count(1)) + .with_run_duration(Duration::from_secs(30)) + .with_expectation(FailFastDuringCaptureExpectation::default()) + .build()?; + + let deployer = LbcLocalDeployer::default(); + let runner = deployer.deploy(&scenario).await?; + + match runner.run(&mut scenario).await { + Err(ScenarioError::ExpectationFailedDuringCapture(_)) => Ok(()), + Err(other) => Err(anyhow!("unexpected scenario error: {other}")), + Ok(_) => Err(anyhow!("expected fail-fast capture error, run succeeded")), + } +} diff --git a/testing-framework/core/src/scenario/expectation.rs b/testing-framework/core/src/scenario/expectation.rs index c304582..f70f011 100644 --- a/testing-framework/core/src/scenario/expectation.rs +++ b/testing-framework/core/src/scenario/expectation.rs @@ -19,5 +19,12 @@ pub trait Expectation: Send + Sync { Ok(()) } + /// Optional periodic check used by fail-fast expectation mode. + /// + /// Default is a no-op so existing expectations stay end-of-run only. + async fn check_during_capture(&mut self, _ctx: &RunContext) -> Result<(), DynError> { + Ok(()) + } + async fn evaluate(&mut self, ctx: &RunContext) -> Result<(), DynError>; } diff --git a/testing-framework/core/src/scenario/runtime/deployer.rs b/testing-framework/core/src/scenario/runtime/deployer.rs index 0151327..d6ee181 100644 --- a/testing-framework/core/src/scenario/runtime/deployer.rs +++ b/testing-framework/core/src/scenario/runtime/deployer.rs @@ -10,6 +10,8 @@ pub enum ScenarioError { Workload(#[source] DynError), #[error("expectation capture failed: {0}")] ExpectationCapture(#[source] DynError), + #[error("expectation failed during capture: {0}")] + ExpectationFailedDuringCapture(#[source] DynError), #[error("expectations failed:\n{0}")] Expectations(#[source] DynError), } diff --git a/testing-framework/core/src/scenario/runtime/runner.rs b/testing-framework/core/src/scenario/runtime/runner.rs index c1502d0..bcc453b 100644 --- a/testing-framework/core/src/scenario/runtime/runner.rs +++ b/testing-framework/core/src/scenario/runtime/runner.rs @@ -1,14 +1,15 @@ use std::{any::Any, future::Future, panic::AssertUnwindSafe, sync::Arc, time::Duration}; -use futures::{FutureExt as _, future}; +use futures::FutureExt as _; use tokio::{ task::{JoinError, JoinSet}, - time::{sleep, timeout}, + time::{Interval, interval, sleep}, }; +use tracing::{debug, info, warn}; use super::deployer::ScenarioError; use crate::scenario::{ - Application, DynError, Expectation, Scenario, + Application, DynError, Expectation, Scenario, Workload, runtime::context::{CleanupGuard, RunContext, RunHandle}, }; @@ -17,6 +18,7 @@ type WorkloadOutcome = Result<(), DynError>; const MIN_NODE_CONTROL_COOLDOWN: Duration = Duration::from_secs(30); const DEFAULT_BLOCK_FEED_SETTLE_WAIT: Duration = Duration::from_secs(1); const MIN_BLOCK_FEED_SETTLE_WAIT: Duration = Duration::from_secs(2); +const EXPECTATION_CAPTURE_CHECK_INTERVAL: Duration = Duration::from_secs(1); const UNKNOWN_PANIC: &str = ""; /// Represents a fully prepared environment capable of executing a scenario. @@ -60,6 +62,16 @@ impl Runner { Caps: Send + Sync, { let context = self.context(); + let run_duration = scenario.duration(); + let workloads = scenario.workloads().to_vec(); + let expectation_count = scenario.expectations().len(); + + info!( + run_secs = run_duration.as_secs(), + workloads = workloads.len(), + expectations = expectation_count, + "runner starting scenario execution" + ); self.run_step(Self::prepare_expectations( scenario.expectations_mut(), @@ -67,8 +79,13 @@ impl Runner { )) .await?; - self.run_step(Self::run_workloads(Arc::clone(&context), scenario)) - .await?; + self.run_step(Self::run_workload_phase( + Arc::clone(&context), + &workloads, + run_duration, + scenario.expectations_mut(), + )) + .await?; Self::settle_before_expectations(context.as_ref()).await; @@ -78,6 +95,8 @@ impl Runner { )) .await?; + info!("runner finished scenario execution"); + Ok(self.into_run_handle()) } @@ -108,32 +127,46 @@ impl Runner { Ok(()) } - async fn run_workloads( + async fn run_workload_phase( context: Arc>, - scenario: &Scenario, - ) -> Result<(), ScenarioError> - where - Caps: Send + Sync, - { - if scenario.workloads().is_empty() { - return idle_until_duration(scenario.duration()).await; + workloads: &[Arc>], + duration: Duration, + expectations: &mut [Box>], + ) -> Result<(), ScenarioError> { + info!( + workloads = workloads.len(), + run_secs = duration.as_secs(), + "runner workload phase started" + ); + + if workloads.is_empty() { + Self::run_idle_window_with_capture_checks(duration, expectations, context.as_ref()) + .await?; + + info!("runner workload phase completed (idle)"); + + return Ok(()); } - let mut workloads = Self::spawn_workloads(scenario, Arc::clone(&context)); - Self::run_workload_window(&mut workloads, scenario.duration()).await?; + let mut running = Self::spawn_workloads(workloads, Arc::clone(&context)); + + Self::run_window_until_timeout(&mut running, duration, expectations, context.as_ref()) + .await?; if let Some(cooldown) = nonzero_cooldown(Self::cooldown_duration(context.as_ref())) { - Self::run_workload_window(&mut workloads, cooldown).await?; + info!( + cooldown_secs = cooldown.as_secs(), + "runner cooldown window started" + ); + + Self::run_window_until_timeout(&mut running, cooldown, expectations, context.as_ref()) + .await?; } - Self::drain_workloads(&mut workloads).await - } + Self::drain_workloads(&mut running).await?; + + info!("runner workload phase completed"); - async fn run_workload_window( - workloads: &mut JoinSet, - duration: Duration, - ) -> Result<(), ScenarioError> { - let _completed = Self::drive_until_timer(workloads, duration).await?; Ok(()) } @@ -203,19 +236,16 @@ impl Runner { } /// Spawn each workload in its own task. - fn spawn_workloads( - scenario: &Scenario, + fn spawn_workloads( + workloads: &[Arc>], context: Arc>, - ) -> JoinSet - where - Caps: Send + Sync, - { - let mut workloads = JoinSet::new(); - for workload in scenario.workloads() { + ) -> JoinSet { + let mut running = JoinSet::new(); + for workload in workloads { let workload = Arc::clone(workload); let ctx = Arc::clone(&context); - workloads.spawn(async move { + running.spawn(async move { // Convert panics into workload errors so the runner can report // them instead of aborting the process. let outcome = AssertUnwindSafe(async { workload.start(ctx.as_ref()).await }) @@ -228,29 +258,87 @@ impl Runner { }); } - workloads + running } /// Drive workload tasks until timeout or failure. - async fn drive_until_timer( + async fn run_window_until_timeout( workloads: &mut JoinSet, duration: Duration, - ) -> Result { - let run_future = async { - while let Some(result) = workloads.join_next().await { - Self::map_join_result(result)?; + expectations: &mut [Box>], + context: &RunContext, + ) -> Result<(), ScenarioError> { + if duration.is_zero() { + return Ok(()); + } + + let timer = sleep(duration); + tokio::pin!(timer); + let mut capture_tick = capture_check_interval(); + + loop { + tokio::select! { + _ = &mut timer => return Ok(()), + _ = capture_tick.tick() => { + Self::run_capture_checks(expectations, context).await?; + } + result = workloads.join_next(), if !workloads.is_empty() => { + let Some(result) = result else { + return Ok(()); + }; + + Self::map_join_result(result)?; + + if workloads.is_empty() { + return Ok(()); + } + } } + } + } - Ok(()) - }; + async fn run_capture_checks( + expectations: &mut [Box>], + context: &RunContext, + ) -> Result<(), ScenarioError> { + let expectation_count = expectations.len(); - match timeout(duration, run_future).await { - Ok(result) => { - result?; - Ok(true) + for expectation in expectations { + if let Err(source) = expectation.check_during_capture(context).await { + warn!(expectation = expectation.name(), %source, "expectation failed during capture"); + + return Err(capture_check_failure(expectation.name(), source)); } + } - Err(_) => Ok(false), + debug!( + expectations = expectation_count, + "expectation capture check pass" + ); + + Ok(()) + } + + async fn run_idle_window_with_capture_checks( + duration: Duration, + expectations: &mut [Box>], + context: &RunContext, + ) -> Result<(), ScenarioError> { + if duration.is_zero() { + return Ok(()); + } + + let timer = sleep(duration); + tokio::pin!(timer); + let mut capture_tick = capture_check_interval(); + + loop { + tokio::select! { + _ = &mut timer => return Ok(()), + _ = capture_tick.tick() => { + Self::run_capture_checks(expectations, context).await?; + } + } } } @@ -274,19 +362,14 @@ impl Runner { } } -async fn idle_until_duration(duration: Duration) -> Result<(), ScenarioError> { - if duration.is_zero() { - return Ok(()); - } - - let _ = timeout(duration, async { future::pending::<()>().await }).await; - Ok(()) -} - fn nonzero_cooldown(cooldown: Option) -> Option { cooldown.filter(|duration| !duration.is_zero()) } +fn capture_check_interval() -> Interval { + interval(EXPECTATION_CAPTURE_CHECK_INTERVAL) +} + fn panic_message(panic: Box) -> String { panic .downcast_ref::<&str>() @@ -295,6 +378,10 @@ fn panic_message(panic: Box) -> String { .unwrap_or_else(|| UNKNOWN_PANIC.to_owned()) } +fn capture_check_failure(expectation: &str, source: DynError) -> ScenarioError { + ScenarioError::ExpectationFailedDuringCapture(format!("{expectation}: {source}").into()) +} + fn expectation_failure_summary(failures: Vec<(String, DynError)>) -> String { failures .into_iter() diff --git a/testing-framework/deployers/local/src/deployer.rs b/testing-framework/deployers/local/src/deployer.rs index 972769d..536e13e 100644 --- a/testing-framework/deployers/local/src/deployer.rs +++ b/testing-framework/deployers/local/src/deployer.rs @@ -126,9 +126,9 @@ impl From for ProcessDeployerError { fn from(value: ScenarioError) -> Self { match value { ScenarioError::Workload(source) => Self::WorkloadFailed { source }, - ScenarioError::ExpectationCapture(source) | ScenarioError::Expectations(source) => { - Self::ExpectationsFailed { source } - } + ScenarioError::ExpectationCapture(source) + | ScenarioError::ExpectationFailedDuringCapture(source) + | ScenarioError::Expectations(source) => Self::ExpectationsFailed { source }, } } }