mirror of
https://github.com/logos-blockchain/logos-blockchain-testing.git
synced 2026-02-23 22:53:13 +00:00
Add expectation fail-fast during capture with runner integration
This commit is contained in:
parent
870885b4eb
commit
74e6ef5fd0
65
logos/examples/tests/expectation_fail_fast_capture.rs
Normal file
65
logos/examples/tests/expectation_fail_fast_capture.rs
Normal file
@ -0,0 +1,65 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::{Result, anyhow};
|
||||
use async_trait::async_trait;
|
||||
use lb_framework::{CoreBuilderExt as _, LbcEnv, LbcLocalDeployer, ScenarioBuilder};
|
||||
use testing_framework_core::scenario::{
|
||||
Deployer, DynError, Expectation, RunContext, ScenarioError,
|
||||
};
|
||||
use tracing_subscriber::fmt::try_init;
|
||||
|
||||
const FAIL_AFTER_CHECKS: usize = 2;
|
||||
|
||||
struct FailFastDuringCaptureExpectation {
|
||||
checks: usize,
|
||||
}
|
||||
|
||||
impl Default for FailFastDuringCaptureExpectation {
|
||||
fn default() -> Self {
|
||||
Self { checks: 0 }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Expectation<LbcEnv> for FailFastDuringCaptureExpectation {
|
||||
fn name(&self) -> &str {
|
||||
"fail_fast_during_capture"
|
||||
}
|
||||
|
||||
async fn check_during_capture(&mut self, _ctx: &RunContext<LbcEnv>) -> Result<(), DynError> {
|
||||
self.checks += 1;
|
||||
if self.checks >= FAIL_AFTER_CHECKS {
|
||||
return Err(format!(
|
||||
"intentional fail-fast trigger after {} capture checks",
|
||||
self.checks
|
||||
)
|
||||
.into());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn evaluate(&mut self, _ctx: &RunContext<LbcEnv>) -> Result<(), DynError> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[ignore = "requires local node binary and open ports"]
|
||||
async fn expectation_can_fail_fast_during_capture() -> Result<()> {
|
||||
let _ = try_init();
|
||||
|
||||
let mut scenario = ScenarioBuilder::deployment_with(|topology| topology.with_node_count(1))
|
||||
.with_run_duration(Duration::from_secs(30))
|
||||
.with_expectation(FailFastDuringCaptureExpectation::default())
|
||||
.build()?;
|
||||
|
||||
let deployer = LbcLocalDeployer::default();
|
||||
let runner = deployer.deploy(&scenario).await?;
|
||||
|
||||
match runner.run(&mut scenario).await {
|
||||
Err(ScenarioError::ExpectationFailedDuringCapture(_)) => Ok(()),
|
||||
Err(other) => Err(anyhow!("unexpected scenario error: {other}")),
|
||||
Ok(_) => Err(anyhow!("expected fail-fast capture error, run succeeded")),
|
||||
}
|
||||
}
|
||||
@ -19,5 +19,12 @@ pub trait Expectation<E: Application>: Send + Sync {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Optional periodic check used by fail-fast expectation mode.
|
||||
///
|
||||
/// Default is a no-op so existing expectations stay end-of-run only.
|
||||
async fn check_during_capture(&mut self, _ctx: &RunContext<E>) -> Result<(), DynError> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn evaluate(&mut self, ctx: &RunContext<E>) -> Result<(), DynError>;
|
||||
}
|
||||
|
||||
@ -10,6 +10,8 @@ pub enum ScenarioError {
|
||||
Workload(#[source] DynError),
|
||||
#[error("expectation capture failed: {0}")]
|
||||
ExpectationCapture(#[source] DynError),
|
||||
#[error("expectation failed during capture: {0}")]
|
||||
ExpectationFailedDuringCapture(#[source] DynError),
|
||||
#[error("expectations failed:\n{0}")]
|
||||
Expectations(#[source] DynError),
|
||||
}
|
||||
|
||||
@ -1,14 +1,15 @@
|
||||
use std::{any::Any, future::Future, panic::AssertUnwindSafe, sync::Arc, time::Duration};
|
||||
|
||||
use futures::{FutureExt as _, future};
|
||||
use futures::FutureExt as _;
|
||||
use tokio::{
|
||||
task::{JoinError, JoinSet},
|
||||
time::{sleep, timeout},
|
||||
time::{Interval, interval, sleep},
|
||||
};
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
use super::deployer::ScenarioError;
|
||||
use crate::scenario::{
|
||||
Application, DynError, Expectation, Scenario,
|
||||
Application, DynError, Expectation, Scenario, Workload,
|
||||
runtime::context::{CleanupGuard, RunContext, RunHandle},
|
||||
};
|
||||
|
||||
@ -17,6 +18,7 @@ type WorkloadOutcome = Result<(), DynError>;
|
||||
const MIN_NODE_CONTROL_COOLDOWN: Duration = Duration::from_secs(30);
|
||||
const DEFAULT_BLOCK_FEED_SETTLE_WAIT: Duration = Duration::from_secs(1);
|
||||
const MIN_BLOCK_FEED_SETTLE_WAIT: Duration = Duration::from_secs(2);
|
||||
const EXPECTATION_CAPTURE_CHECK_INTERVAL: Duration = Duration::from_secs(1);
|
||||
const UNKNOWN_PANIC: &str = "<unknown panic>";
|
||||
|
||||
/// Represents a fully prepared environment capable of executing a scenario.
|
||||
@ -60,6 +62,16 @@ impl<E: Application> Runner<E> {
|
||||
Caps: Send + Sync,
|
||||
{
|
||||
let context = self.context();
|
||||
let run_duration = scenario.duration();
|
||||
let workloads = scenario.workloads().to_vec();
|
||||
let expectation_count = scenario.expectations().len();
|
||||
|
||||
info!(
|
||||
run_secs = run_duration.as_secs(),
|
||||
workloads = workloads.len(),
|
||||
expectations = expectation_count,
|
||||
"runner starting scenario execution"
|
||||
);
|
||||
|
||||
self.run_step(Self::prepare_expectations(
|
||||
scenario.expectations_mut(),
|
||||
@ -67,8 +79,13 @@ impl<E: Application> Runner<E> {
|
||||
))
|
||||
.await?;
|
||||
|
||||
self.run_step(Self::run_workloads(Arc::clone(&context), scenario))
|
||||
.await?;
|
||||
self.run_step(Self::run_workload_phase(
|
||||
Arc::clone(&context),
|
||||
&workloads,
|
||||
run_duration,
|
||||
scenario.expectations_mut(),
|
||||
))
|
||||
.await?;
|
||||
|
||||
Self::settle_before_expectations(context.as_ref()).await;
|
||||
|
||||
@ -78,6 +95,8 @@ impl<E: Application> Runner<E> {
|
||||
))
|
||||
.await?;
|
||||
|
||||
info!("runner finished scenario execution");
|
||||
|
||||
Ok(self.into_run_handle())
|
||||
}
|
||||
|
||||
@ -108,32 +127,46 @@ impl<E: Application> Runner<E> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn run_workloads<Caps>(
|
||||
async fn run_workload_phase(
|
||||
context: Arc<RunContext<E>>,
|
||||
scenario: &Scenario<E, Caps>,
|
||||
) -> Result<(), ScenarioError>
|
||||
where
|
||||
Caps: Send + Sync,
|
||||
{
|
||||
if scenario.workloads().is_empty() {
|
||||
return idle_until_duration(scenario.duration()).await;
|
||||
workloads: &[Arc<dyn Workload<E>>],
|
||||
duration: Duration,
|
||||
expectations: &mut [Box<dyn Expectation<E>>],
|
||||
) -> Result<(), ScenarioError> {
|
||||
info!(
|
||||
workloads = workloads.len(),
|
||||
run_secs = duration.as_secs(),
|
||||
"runner workload phase started"
|
||||
);
|
||||
|
||||
if workloads.is_empty() {
|
||||
Self::run_idle_window_with_capture_checks(duration, expectations, context.as_ref())
|
||||
.await?;
|
||||
|
||||
info!("runner workload phase completed (idle)");
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut workloads = Self::spawn_workloads(scenario, Arc::clone(&context));
|
||||
Self::run_workload_window(&mut workloads, scenario.duration()).await?;
|
||||
let mut running = Self::spawn_workloads(workloads, Arc::clone(&context));
|
||||
|
||||
Self::run_window_until_timeout(&mut running, duration, expectations, context.as_ref())
|
||||
.await?;
|
||||
|
||||
if let Some(cooldown) = nonzero_cooldown(Self::cooldown_duration(context.as_ref())) {
|
||||
Self::run_workload_window(&mut workloads, cooldown).await?;
|
||||
info!(
|
||||
cooldown_secs = cooldown.as_secs(),
|
||||
"runner cooldown window started"
|
||||
);
|
||||
|
||||
Self::run_window_until_timeout(&mut running, cooldown, expectations, context.as_ref())
|
||||
.await?;
|
||||
}
|
||||
|
||||
Self::drain_workloads(&mut workloads).await
|
||||
}
|
||||
Self::drain_workloads(&mut running).await?;
|
||||
|
||||
info!("runner workload phase completed");
|
||||
|
||||
async fn run_workload_window(
|
||||
workloads: &mut JoinSet<WorkloadOutcome>,
|
||||
duration: Duration,
|
||||
) -> Result<(), ScenarioError> {
|
||||
let _completed = Self::drive_until_timer(workloads, duration).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -203,19 +236,16 @@ impl<E: Application> Runner<E> {
|
||||
}
|
||||
|
||||
/// Spawn each workload in its own task.
|
||||
fn spawn_workloads<Caps>(
|
||||
scenario: &Scenario<E, Caps>,
|
||||
fn spawn_workloads(
|
||||
workloads: &[Arc<dyn Workload<E>>],
|
||||
context: Arc<RunContext<E>>,
|
||||
) -> JoinSet<WorkloadOutcome>
|
||||
where
|
||||
Caps: Send + Sync,
|
||||
{
|
||||
let mut workloads = JoinSet::new();
|
||||
for workload in scenario.workloads() {
|
||||
) -> JoinSet<WorkloadOutcome> {
|
||||
let mut running = JoinSet::new();
|
||||
for workload in workloads {
|
||||
let workload = Arc::clone(workload);
|
||||
let ctx = Arc::clone(&context);
|
||||
|
||||
workloads.spawn(async move {
|
||||
running.spawn(async move {
|
||||
// Convert panics into workload errors so the runner can report
|
||||
// them instead of aborting the process.
|
||||
let outcome = AssertUnwindSafe(async { workload.start(ctx.as_ref()).await })
|
||||
@ -228,29 +258,87 @@ impl<E: Application> Runner<E> {
|
||||
});
|
||||
}
|
||||
|
||||
workloads
|
||||
running
|
||||
}
|
||||
|
||||
/// Drive workload tasks until timeout or failure.
|
||||
async fn drive_until_timer(
|
||||
async fn run_window_until_timeout(
|
||||
workloads: &mut JoinSet<WorkloadOutcome>,
|
||||
duration: Duration,
|
||||
) -> Result<bool, ScenarioError> {
|
||||
let run_future = async {
|
||||
while let Some(result) = workloads.join_next().await {
|
||||
Self::map_join_result(result)?;
|
||||
expectations: &mut [Box<dyn Expectation<E>>],
|
||||
context: &RunContext<E>,
|
||||
) -> Result<(), ScenarioError> {
|
||||
if duration.is_zero() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let timer = sleep(duration);
|
||||
tokio::pin!(timer);
|
||||
let mut capture_tick = capture_check_interval();
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = &mut timer => return Ok(()),
|
||||
_ = capture_tick.tick() => {
|
||||
Self::run_capture_checks(expectations, context).await?;
|
||||
}
|
||||
result = workloads.join_next(), if !workloads.is_empty() => {
|
||||
let Some(result) = result else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
Self::map_join_result(result)?;
|
||||
|
||||
if workloads.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
};
|
||||
async fn run_capture_checks(
|
||||
expectations: &mut [Box<dyn Expectation<E>>],
|
||||
context: &RunContext<E>,
|
||||
) -> Result<(), ScenarioError> {
|
||||
let expectation_count = expectations.len();
|
||||
|
||||
match timeout(duration, run_future).await {
|
||||
Ok(result) => {
|
||||
result?;
|
||||
Ok(true)
|
||||
for expectation in expectations {
|
||||
if let Err(source) = expectation.check_during_capture(context).await {
|
||||
warn!(expectation = expectation.name(), %source, "expectation failed during capture");
|
||||
|
||||
return Err(capture_check_failure(expectation.name(), source));
|
||||
}
|
||||
}
|
||||
|
||||
Err(_) => Ok(false),
|
||||
debug!(
|
||||
expectations = expectation_count,
|
||||
"expectation capture check pass"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn run_idle_window_with_capture_checks(
|
||||
duration: Duration,
|
||||
expectations: &mut [Box<dyn Expectation<E>>],
|
||||
context: &RunContext<E>,
|
||||
) -> Result<(), ScenarioError> {
|
||||
if duration.is_zero() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let timer = sleep(duration);
|
||||
tokio::pin!(timer);
|
||||
let mut capture_tick = capture_check_interval();
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = &mut timer => return Ok(()),
|
||||
_ = capture_tick.tick() => {
|
||||
Self::run_capture_checks(expectations, context).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -274,19 +362,14 @@ impl<E: Application> Runner<E> {
|
||||
}
|
||||
}
|
||||
|
||||
async fn idle_until_duration(duration: Duration) -> Result<(), ScenarioError> {
|
||||
if duration.is_zero() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let _ = timeout(duration, async { future::pending::<()>().await }).await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn nonzero_cooldown(cooldown: Option<Duration>) -> Option<Duration> {
|
||||
cooldown.filter(|duration| !duration.is_zero())
|
||||
}
|
||||
|
||||
fn capture_check_interval() -> Interval {
|
||||
interval(EXPECTATION_CAPTURE_CHECK_INTERVAL)
|
||||
}
|
||||
|
||||
fn panic_message(panic: Box<dyn Any + Send>) -> String {
|
||||
panic
|
||||
.downcast_ref::<&str>()
|
||||
@ -295,6 +378,10 @@ fn panic_message(panic: Box<dyn Any + Send>) -> String {
|
||||
.unwrap_or_else(|| UNKNOWN_PANIC.to_owned())
|
||||
}
|
||||
|
||||
fn capture_check_failure(expectation: &str, source: DynError) -> ScenarioError {
|
||||
ScenarioError::ExpectationFailedDuringCapture(format!("{expectation}: {source}").into())
|
||||
}
|
||||
|
||||
fn expectation_failure_summary(failures: Vec<(String, DynError)>) -> String {
|
||||
failures
|
||||
.into_iter()
|
||||
|
||||
@ -126,9 +126,9 @@ impl From<ScenarioError> for ProcessDeployerError {
|
||||
fn from(value: ScenarioError) -> Self {
|
||||
match value {
|
||||
ScenarioError::Workload(source) => Self::WorkloadFailed { source },
|
||||
ScenarioError::ExpectationCapture(source) | ScenarioError::Expectations(source) => {
|
||||
Self::ExpectationsFailed { source }
|
||||
}
|
||||
ScenarioError::ExpectationCapture(source)
|
||||
| ScenarioError::ExpectationFailedDuringCapture(source)
|
||||
| ScenarioError::Expectations(source) => Self::ExpectationsFailed { source },
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user