From d4958b5b1a19c594510ec52d1f561a442ed2ea51 Mon Sep 17 00:00:00 2001 From: andrussal Date: Fri, 19 Dec 2025 01:55:17 +0100 Subject: [PATCH] workflows: split capture task setup Refactors long expectation capture functions into phase helpers. - DaWorkloadExpectation::start_capture - Before: planned channel math + shared state init + 2 spawned tasks + state install in one method. - After: planned_channel_ids/build_capture_state/spawn_run_block_counter/spawn_da_capture. - TxInclusionExpectation::start_capture - Before: wallet/plan validation + tracked set build + capture loop in one method. - After: build_capture_plan/spawn_tx_inclusion_capture/capture_tx_outputs. --- .../workflows/src/workloads/da/expectation.rs | 143 ++++++++++-------- .../src/workloads/transaction/expectation.rs | 127 +++++++++------- 2 files changed, 157 insertions(+), 113 deletions(-) diff --git a/testing-framework/workflows/src/workloads/da/expectation.rs b/testing-framework/workflows/src/workloads/da/expectation.rs index 94ffec0..cf5b4ad 100644 --- a/testing-framework/workflows/src/workloads/da/expectation.rs +++ b/testing-framework/workflows/src/workloads/da/expectation.rs @@ -102,11 +102,7 @@ impl Expectation for DaWorkloadExpectation { return Ok(()); } - let planned_ids = planned_channel_ids(planned_channel_count( - self.channel_rate_per_block, - self.headroom_percent, - )); - + let planned_ids = self.planned_channel_ids(); let run_duration = ctx.run_metrics().run_duration(); tracing::info!( @@ -117,65 +113,22 @@ impl Expectation for DaWorkloadExpectation { "DA inclusion expectation starting capture" ); - let planned = Arc::new(planned_ids.iter().copied().collect::>()); - let inscriptions = Arc::new(Mutex::new(HashSet::new())); - let blobs = Arc::new(Mutex::new(HashMap::new())); - let run_blocks = Arc::new(AtomicU64::new(0)); + let capture = build_capture_state(planned_ids, run_duration); + let block_feed = ctx.block_feed(); - { - let run_blocks = Arc::clone(&run_blocks); - let mut receiver = ctx.block_feed().subscribe(); - spawn(async move { - let timer = sleep(run_duration); - pin!(timer); - - loop { - select! { - _ = &mut timer => break, - result = receiver.recv() => match result { - Ok(_) => { - run_blocks.fetch_add(1, Ordering::Relaxed); - } - Err(broadcast::error::RecvError::Lagged(_)) => {} - Err(broadcast::error::RecvError::Closed) => break, - } - } - } - }); - } - - let mut receiver = ctx.block_feed().subscribe(); - let planned_for_task = Arc::clone(&planned); - let inscriptions_for_task = Arc::clone(&inscriptions); - let blobs_for_task = Arc::clone(&blobs); - - spawn(async move { - loop { - match receiver.recv().await { - Ok(record) => capture_block( - record.as_ref(), - &planned_for_task, - &inscriptions_for_task, - &blobs_for_task, - ), - Err(broadcast::error::RecvError::Lagged(skipped)) => { - tracing::debug!(skipped, "DA expectation: receiver lagged"); - } - Err(broadcast::error::RecvError::Closed) => { - tracing::debug!("DA expectation: block feed closed"); - break; - } - } - } - }); - - self.capture_state = Some(CaptureState { - planned, - inscriptions, - blobs, - run_blocks, + spawn_run_block_counter( + Arc::clone(&capture.run_blocks), run_duration, - }); + block_feed.clone(), + ); + spawn_da_capture( + Arc::clone(&capture.planned), + Arc::clone(&capture.inscriptions), + Arc::clone(&capture.blobs), + block_feed, + ); + + self.capture_state = Some(capture); Ok(()) } @@ -202,6 +155,13 @@ struct BlobObservation { } impl DaWorkloadExpectation { + fn planned_channel_ids(&self) -> Vec { + planned_channel_ids(planned_channel_count( + self.channel_rate_per_block, + self.headroom_percent, + )) + } + fn capture_state(&self) -> Result<&CaptureState, DynError> { self.capture_state .as_ref() @@ -338,6 +298,65 @@ impl DaWorkloadExpectation { } } +fn build_capture_state(planned_ids: Vec, run_duration: Duration) -> CaptureState { + CaptureState { + planned: Arc::new(planned_ids.into_iter().collect()), + inscriptions: Arc::new(Mutex::new(HashSet::new())), + blobs: Arc::new(Mutex::new(HashMap::new())), + run_blocks: Arc::new(AtomicU64::new(0)), + run_duration, + } +} + +fn spawn_run_block_counter( + run_blocks: Arc, + run_duration: Duration, + block_feed: testing_framework_core::scenario::BlockFeed, +) { + let mut receiver = block_feed.subscribe(); + spawn(async move { + let timer = sleep(run_duration); + pin!(timer); + + loop { + select! { + _ = &mut timer => break, + result = receiver.recv() => match result { + Ok(_) => { + run_blocks.fetch_add(1, Ordering::Relaxed); + } + Err(broadcast::error::RecvError::Lagged(_)) => {} + Err(broadcast::error::RecvError::Closed) => break, + } + } + } + }); +} + +fn spawn_da_capture( + planned: Arc>, + inscriptions: Arc>>, + blobs: Arc>>, + block_feed: testing_framework_core::scenario::BlockFeed, +) { + let mut receiver = block_feed.subscribe(); + + spawn(async move { + loop { + match receiver.recv().await { + Ok(record) => capture_block(record.as_ref(), &planned, &inscriptions, &blobs), + Err(broadcast::error::RecvError::Lagged(skipped)) => { + tracing::debug!(skipped, "DA expectation: receiver lagged"); + } + Err(broadcast::error::RecvError::Closed) => { + tracing::debug!("DA expectation: block feed closed"); + break; + } + } + } + }); +} + fn capture_block( block: &BlockRecord, planned: &HashSet, diff --git a/testing-framework/workflows/src/workloads/transaction/expectation.rs b/testing-framework/workflows/src/workloads/transaction/expectation.rs index 53e20a7..eb258da 100644 --- a/testing-framework/workflows/src/workloads/transaction/expectation.rs +++ b/testing-framework/workflows/src/workloads/transaction/expectation.rs @@ -15,7 +15,7 @@ use testing_framework_core::scenario::{DynError, Expectation, RunContext}; use thiserror::Error; use tokio::{sync::broadcast, time::sleep}; -use super::workload::{limited_user_count, submission_plan}; +use super::workload::{SubmissionPlan, limited_user_count, submission_plan}; const MIN_INCLUSION_RATIO: f64 = 0.5; const CATCHUP_POLL_INTERVAL: Duration = Duration::from_secs(1); @@ -74,13 +74,7 @@ impl Expectation for TxInclusionExpectation { return Ok(()); } - let wallet_accounts = ctx.descriptors().config().wallet().accounts.clone(); - if wallet_accounts.is_empty() { - return Err(TxExpectationError::MissingAccounts.into()); - } - - let available = limited_user_count(self.user_limit, wallet_accounts.len()); - let plan = submission_plan(self.txs_per_block, ctx, available)?; + let (plan, tracked_accounts) = build_capture_plan(self, ctx)?; if plan.transaction_count == 0 { return Err(TxExpectationError::NoPlannedTransactions.into()); } @@ -92,50 +86,12 @@ impl Expectation for TxInclusionExpectation { "tx inclusion expectation starting capture" ); - let wallet_pks = wallet_accounts - .into_iter() - .take(plan.transaction_count) - .map(|account| account.secret_key.to_public_key()) - .collect::>(); - let observed = Arc::new(AtomicU64::new(0)); - let receiver = ctx.block_feed().subscribe(); - let tracked_accounts: Arc> = Arc::new(wallet_pks); - let spawn_accounts: Arc> = Arc::clone(&tracked_accounts); - let spawn_observed = Arc::clone(&observed); - - tokio::spawn(async move { - let mut receiver = receiver; - let genesis_parent = HeaderId::from([0; 32]); - tracing::debug!("tx inclusion capture task started"); - loop { - match receiver.recv().await { - Ok(record) => { - if record.block.header().parent_block() == genesis_parent { - continue; - } - - for tx in record.block.transactions() { - for note in &tx.mantle_tx().ledger_tx.outputs { - if spawn_accounts.contains(¬e.pk) { - spawn_observed.fetch_add(1, Ordering::Relaxed); - tracing::debug!(pk = ?note.pk, "tx inclusion observed account output"); - break; - } - } - } - } - Err(broadcast::error::RecvError::Lagged(skipped)) => { - tracing::debug!(skipped, "tx inclusion capture lagged"); - } - Err(broadcast::error::RecvError::Closed) => { - tracing::debug!("tx inclusion capture feed closed"); - break; - } - } - } - tracing::debug!("tx inclusion capture task exiting"); - }); + spawn_tx_inclusion_capture( + ctx.block_feed().subscribe(), + Arc::new(tracked_accounts), + Arc::clone(&observed), + ); self.capture_state = Some(CaptureState { observed, @@ -190,3 +146,72 @@ impl Expectation for TxInclusionExpectation { } } } + +fn build_capture_plan( + expectation: &TxInclusionExpectation, + ctx: &RunContext, +) -> Result<(SubmissionPlan, HashSet), DynError> { + let wallet_accounts = ctx.descriptors().config().wallet().accounts.clone(); + if wallet_accounts.is_empty() { + return Err(TxExpectationError::MissingAccounts.into()); + } + + let available = limited_user_count(expectation.user_limit, wallet_accounts.len()); + let plan = submission_plan(expectation.txs_per_block, ctx, available)?; + + let wallet_pks = wallet_accounts + .into_iter() + .take(plan.transaction_count) + .map(|account| account.secret_key.to_public_key()) + .collect::>(); + + Ok((plan, wallet_pks)) +} + +fn spawn_tx_inclusion_capture( + mut receiver: broadcast::Receiver>, + tracked_accounts: Arc>, + observed: Arc, +) { + tokio::spawn(async move { + let genesis_parent = HeaderId::from([0; 32]); + tracing::debug!("tx inclusion capture task started"); + + loop { + match receiver.recv().await { + Ok(record) => { + if record.block.header().parent_block() == genesis_parent { + continue; + } + + capture_tx_outputs(record.as_ref(), &tracked_accounts, &observed); + } + Err(broadcast::error::RecvError::Lagged(skipped)) => { + tracing::debug!(skipped, "tx inclusion capture lagged"); + } + Err(broadcast::error::RecvError::Closed) => { + tracing::debug!("tx inclusion capture feed closed"); + break; + } + } + } + + tracing::debug!("tx inclusion capture task exiting"); + }); +} + +fn capture_tx_outputs( + record: &testing_framework_core::scenario::BlockRecord, + tracked_accounts: &HashSet, + observed: &AtomicU64, +) { + for tx in record.block.transactions() { + for note in &tx.mantle_tx().ledger_tx.outputs { + if tracked_accounts.contains(¬e.pk) { + observed.fetch_add(1, Ordering::Relaxed); + tracing::debug!(pk = ?note.pk, "tx inclusion observed account output"); + break; + } + } + } +}