use std::{ collections::HashSet, num::{NonZeroU64, NonZeroUsize}, sync::{ Arc, atomic::{AtomicU64, Ordering}, }, time::Duration, }; use async_trait::async_trait; use lb_core::{header::HeaderId, mantle::AuthenticatedMantleTx as _}; use lb_key_management_system_service::keys::ZkPublicKey; use testing_framework_core::scenario::{DynError, Expectation, RunContext}; use thiserror::Error; use tokio::{sync::broadcast, time::sleep}; use super::workload::{SubmissionPlan, limited_user_count, submission_plan}; const MIN_INCLUSION_RATIO: f64 = 0.5; const CATCHUP_POLL_INTERVAL: Duration = Duration::from_secs(1); const MAX_CATCHUP_WAIT: Duration = Duration::from_secs(60); #[derive(Clone)] pub struct TxInclusionExpectation { txs_per_block: NonZeroU64, user_limit: Option, capture_state: Option, } #[derive(Clone)] struct CaptureState { observed: Arc, expected: u64, } #[derive(Debug, Error)] enum TxExpectationError { #[error("transaction workload requires seeded accounts")] MissingAccounts, #[error("transaction workload planned zero transactions")] NoPlannedTransactions, #[error("transaction inclusion expectation not captured")] NotCaptured, #[error("transaction inclusion observed {observed} below required {required}")] InsufficientInclusions { observed: u64, required: u64 }, } impl TxInclusionExpectation { /// Expectation that checks a minimum fraction of planned transactions were /// included. pub const NAME: &'static str = "tx_inclusion_expectation"; /// Constructs an inclusion expectation using the same parameters as the /// workload. #[must_use] pub const fn new(txs_per_block: NonZeroU64, user_limit: Option) -> Self { Self { txs_per_block, user_limit, capture_state: None, } } } #[async_trait] impl Expectation for TxInclusionExpectation { fn name(&self) -> &'static str { Self::NAME } async fn start_capture(&mut self, ctx: &RunContext) -> Result<(), DynError> { if self.capture_state.is_some() { return Ok(()); } let (plan, tracked_accounts) = build_capture_plan(self, ctx)?; if plan.transaction_count == 0 { return Err(TxExpectationError::NoPlannedTransactions.into()); } tracing::info!( planned_txs = plan.transaction_count, txs_per_block = self.txs_per_block.get(), user_limit = self.user_limit.map(|u| u.get()), "tx inclusion expectation starting capture" ); let observed = Arc::new(AtomicU64::new(0)); spawn_tx_inclusion_capture( ctx.block_feed().subscribe(), Arc::new(tracked_accounts), Arc::clone(&observed), ); self.capture_state = Some(CaptureState { observed, expected: plan.transaction_count as u64, }); Ok(()) } async fn evaluate(&mut self, ctx: &RunContext) -> Result<(), DynError> { let state = self .capture_state .as_ref() .ok_or(TxExpectationError::NotCaptured)?; let required = ((state.expected as f64) * MIN_INCLUSION_RATIO).ceil() as u64; let mut observed = state.observed.load(Ordering::Relaxed); if observed < required { let security_param = ctx.descriptors().config().consensus_params.security_param; let hinted_wait = ctx .run_metrics() .block_interval_hint() .map(|interval| interval.mul_f64(security_param.get() as f64)); let mut remaining = hinted_wait .unwrap_or(MAX_CATCHUP_WAIT) .min(MAX_CATCHUP_WAIT); while observed < required && remaining > Duration::ZERO { sleep(CATCHUP_POLL_INTERVAL).await; remaining = remaining.saturating_sub(CATCHUP_POLL_INTERVAL); observed = state.observed.load(Ordering::Relaxed); } } if observed >= required { tracing::info!( observed, required, expected = state.expected, "tx inclusion expectation satisfied" ); Ok(()) } else { tracing::warn!( observed, required, expected = state.expected, "tx inclusion expectation failed" ); Err(TxExpectationError::InsufficientInclusions { observed, required }.into()) } } } fn build_capture_plan( expectation: &TxInclusionExpectation, ctx: &RunContext, ) -> Result<(SubmissionPlan, HashSet), DynError> { let wallet_accounts = ctx.descriptors().config().wallet().accounts.clone(); if wallet_accounts.is_empty() { return Err(TxExpectationError::MissingAccounts.into()); } let available = limited_user_count(expectation.user_limit, wallet_accounts.len()); let plan = submission_plan(expectation.txs_per_block, ctx, available)?; let wallet_pks = wallet_accounts .into_iter() .take(plan.transaction_count) .map(|account| account.secret_key.to_public_key()) .collect::>(); Ok((plan, wallet_pks)) } fn spawn_tx_inclusion_capture( mut receiver: broadcast::Receiver>, tracked_accounts: Arc>, observed: Arc, ) { tokio::spawn(async move { let genesis_parent = HeaderId::from([0; 32]); tracing::debug!("tx inclusion capture task started"); loop { match receiver.recv().await { Ok(record) => { if record.block.header().parent_block() == genesis_parent { continue; } capture_tx_outputs(record.as_ref(), &tracked_accounts, &observed); } Err(broadcast::error::RecvError::Lagged(skipped)) => { tracing::debug!(skipped, "tx inclusion capture lagged"); } Err(broadcast::error::RecvError::Closed) => { tracing::debug!("tx inclusion capture feed closed"); break; } } } tracing::debug!("tx inclusion capture task exiting"); }); } fn capture_tx_outputs( record: &testing_framework_core::scenario::BlockRecord, tracked_accounts: &HashSet, observed: &AtomicU64, ) { for tx in record.block.transactions() { for note in &tx.mantle_tx().ledger_tx.outputs { if tracked_accounts.contains(¬e.pk) { observed.fetch_add(1, Ordering::Relaxed); tracing::debug!(pk = ?note.pk, "tx inclusion observed account output"); break; } } } }