workflows: split capture task setup

Refactors long expectation capture functions into phase helpers.

- DaWorkloadExpectation::start_capture
  - Before: planned channel math + shared state init + 2 spawned tasks + state install in one method.
  - After: planned_channel_ids/build_capture_state/spawn_run_block_counter/spawn_da_capture.

- TxInclusionExpectation::start_capture
  - Before: wallet/plan validation + tracked set build + capture loop in one method.
  - After: build_capture_plan/spawn_tx_inclusion_capture/capture_tx_outputs.
This commit is contained in:
andrussal 2025-12-19 01:55:17 +01:00
parent 870280e354
commit d4958b5b1a
2 changed files with 157 additions and 113 deletions

View File

@ -102,11 +102,7 @@ impl Expectation for DaWorkloadExpectation {
return Ok(());
}
let planned_ids = planned_channel_ids(planned_channel_count(
self.channel_rate_per_block,
self.headroom_percent,
));
let planned_ids = self.planned_channel_ids();
let run_duration = ctx.run_metrics().run_duration();
tracing::info!(
@ -117,65 +113,22 @@ impl Expectation for DaWorkloadExpectation {
"DA inclusion expectation starting capture"
);
let planned = Arc::new(planned_ids.iter().copied().collect::<HashSet<_>>());
let inscriptions = Arc::new(Mutex::new(HashSet::new()));
let blobs = Arc::new(Mutex::new(HashMap::new()));
let run_blocks = Arc::new(AtomicU64::new(0));
let capture = build_capture_state(planned_ids, run_duration);
let block_feed = ctx.block_feed();
{
let run_blocks = Arc::clone(&run_blocks);
let mut receiver = ctx.block_feed().subscribe();
spawn(async move {
let timer = sleep(run_duration);
pin!(timer);
loop {
select! {
_ = &mut timer => break,
result = receiver.recv() => match result {
Ok(_) => {
run_blocks.fetch_add(1, Ordering::Relaxed);
}
Err(broadcast::error::RecvError::Lagged(_)) => {}
Err(broadcast::error::RecvError::Closed) => break,
}
}
}
});
}
let mut receiver = ctx.block_feed().subscribe();
let planned_for_task = Arc::clone(&planned);
let inscriptions_for_task = Arc::clone(&inscriptions);
let blobs_for_task = Arc::clone(&blobs);
spawn(async move {
loop {
match receiver.recv().await {
Ok(record) => capture_block(
record.as_ref(),
&planned_for_task,
&inscriptions_for_task,
&blobs_for_task,
),
Err(broadcast::error::RecvError::Lagged(skipped)) => {
tracing::debug!(skipped, "DA expectation: receiver lagged");
}
Err(broadcast::error::RecvError::Closed) => {
tracing::debug!("DA expectation: block feed closed");
break;
}
}
}
});
self.capture_state = Some(CaptureState {
planned,
inscriptions,
blobs,
run_blocks,
spawn_run_block_counter(
Arc::clone(&capture.run_blocks),
run_duration,
});
block_feed.clone(),
);
spawn_da_capture(
Arc::clone(&capture.planned),
Arc::clone(&capture.inscriptions),
Arc::clone(&capture.blobs),
block_feed,
);
self.capture_state = Some(capture);
Ok(())
}
@ -202,6 +155,13 @@ struct BlobObservation {
}
impl DaWorkloadExpectation {
fn planned_channel_ids(&self) -> Vec<ChannelId> {
planned_channel_ids(planned_channel_count(
self.channel_rate_per_block,
self.headroom_percent,
))
}
fn capture_state(&self) -> Result<&CaptureState, DynError> {
self.capture_state
.as_ref()
@ -338,6 +298,65 @@ impl DaWorkloadExpectation {
}
}
fn build_capture_state(planned_ids: Vec<ChannelId>, run_duration: Duration) -> CaptureState {
CaptureState {
planned: Arc::new(planned_ids.into_iter().collect()),
inscriptions: Arc::new(Mutex::new(HashSet::new())),
blobs: Arc::new(Mutex::new(HashMap::new())),
run_blocks: Arc::new(AtomicU64::new(0)),
run_duration,
}
}
fn spawn_run_block_counter(
run_blocks: Arc<AtomicU64>,
run_duration: Duration,
block_feed: testing_framework_core::scenario::BlockFeed,
) {
let mut receiver = block_feed.subscribe();
spawn(async move {
let timer = sleep(run_duration);
pin!(timer);
loop {
select! {
_ = &mut timer => break,
result = receiver.recv() => match result {
Ok(_) => {
run_blocks.fetch_add(1, Ordering::Relaxed);
}
Err(broadcast::error::RecvError::Lagged(_)) => {}
Err(broadcast::error::RecvError::Closed) => break,
}
}
}
});
}
fn spawn_da_capture(
planned: Arc<HashSet<ChannelId>>,
inscriptions: Arc<Mutex<HashSet<ChannelId>>>,
blobs: Arc<Mutex<HashMap<ChannelId, u64>>>,
block_feed: testing_framework_core::scenario::BlockFeed,
) {
let mut receiver = block_feed.subscribe();
spawn(async move {
loop {
match receiver.recv().await {
Ok(record) => capture_block(record.as_ref(), &planned, &inscriptions, &blobs),
Err(broadcast::error::RecvError::Lagged(skipped)) => {
tracing::debug!(skipped, "DA expectation: receiver lagged");
}
Err(broadcast::error::RecvError::Closed) => {
tracing::debug!("DA expectation: block feed closed");
break;
}
}
}
});
}
fn capture_block(
block: &BlockRecord,
planned: &HashSet<ChannelId>,

View File

@ -15,7 +15,7 @@ use testing_framework_core::scenario::{DynError, Expectation, RunContext};
use thiserror::Error;
use tokio::{sync::broadcast, time::sleep};
use super::workload::{limited_user_count, submission_plan};
use super::workload::{SubmissionPlan, limited_user_count, submission_plan};
const MIN_INCLUSION_RATIO: f64 = 0.5;
const CATCHUP_POLL_INTERVAL: Duration = Duration::from_secs(1);
@ -74,13 +74,7 @@ impl Expectation for TxInclusionExpectation {
return Ok(());
}
let wallet_accounts = ctx.descriptors().config().wallet().accounts.clone();
if wallet_accounts.is_empty() {
return Err(TxExpectationError::MissingAccounts.into());
}
let available = limited_user_count(self.user_limit, wallet_accounts.len());
let plan = submission_plan(self.txs_per_block, ctx, available)?;
let (plan, tracked_accounts) = build_capture_plan(self, ctx)?;
if plan.transaction_count == 0 {
return Err(TxExpectationError::NoPlannedTransactions.into());
}
@ -92,50 +86,12 @@ impl Expectation for TxInclusionExpectation {
"tx inclusion expectation starting capture"
);
let wallet_pks = wallet_accounts
.into_iter()
.take(plan.transaction_count)
.map(|account| account.secret_key.to_public_key())
.collect::<HashSet<ZkPublicKey>>();
let observed = Arc::new(AtomicU64::new(0));
let receiver = ctx.block_feed().subscribe();
let tracked_accounts: Arc<HashSet<ZkPublicKey>> = Arc::new(wallet_pks);
let spawn_accounts: Arc<HashSet<ZkPublicKey>> = Arc::clone(&tracked_accounts);
let spawn_observed = Arc::clone(&observed);
tokio::spawn(async move {
let mut receiver = receiver;
let genesis_parent = HeaderId::from([0; 32]);
tracing::debug!("tx inclusion capture task started");
loop {
match receiver.recv().await {
Ok(record) => {
if record.block.header().parent_block() == genesis_parent {
continue;
}
for tx in record.block.transactions() {
for note in &tx.mantle_tx().ledger_tx.outputs {
if spawn_accounts.contains(&note.pk) {
spawn_observed.fetch_add(1, Ordering::Relaxed);
tracing::debug!(pk = ?note.pk, "tx inclusion observed account output");
break;
}
}
}
}
Err(broadcast::error::RecvError::Lagged(skipped)) => {
tracing::debug!(skipped, "tx inclusion capture lagged");
}
Err(broadcast::error::RecvError::Closed) => {
tracing::debug!("tx inclusion capture feed closed");
break;
}
}
}
tracing::debug!("tx inclusion capture task exiting");
});
spawn_tx_inclusion_capture(
ctx.block_feed().subscribe(),
Arc::new(tracked_accounts),
Arc::clone(&observed),
);
self.capture_state = Some(CaptureState {
observed,
@ -190,3 +146,72 @@ impl Expectation for TxInclusionExpectation {
}
}
}
fn build_capture_plan(
expectation: &TxInclusionExpectation,
ctx: &RunContext,
) -> Result<(SubmissionPlan, HashSet<ZkPublicKey>), DynError> {
let wallet_accounts = ctx.descriptors().config().wallet().accounts.clone();
if wallet_accounts.is_empty() {
return Err(TxExpectationError::MissingAccounts.into());
}
let available = limited_user_count(expectation.user_limit, wallet_accounts.len());
let plan = submission_plan(expectation.txs_per_block, ctx, available)?;
let wallet_pks = wallet_accounts
.into_iter()
.take(plan.transaction_count)
.map(|account| account.secret_key.to_public_key())
.collect::<HashSet<ZkPublicKey>>();
Ok((plan, wallet_pks))
}
fn spawn_tx_inclusion_capture(
mut receiver: broadcast::Receiver<Arc<testing_framework_core::scenario::BlockRecord>>,
tracked_accounts: Arc<HashSet<ZkPublicKey>>,
observed: Arc<AtomicU64>,
) {
tokio::spawn(async move {
let genesis_parent = HeaderId::from([0; 32]);
tracing::debug!("tx inclusion capture task started");
loop {
match receiver.recv().await {
Ok(record) => {
if record.block.header().parent_block() == genesis_parent {
continue;
}
capture_tx_outputs(record.as_ref(), &tracked_accounts, &observed);
}
Err(broadcast::error::RecvError::Lagged(skipped)) => {
tracing::debug!(skipped, "tx inclusion capture lagged");
}
Err(broadcast::error::RecvError::Closed) => {
tracing::debug!("tx inclusion capture feed closed");
break;
}
}
}
tracing::debug!("tx inclusion capture task exiting");
});
}
fn capture_tx_outputs(
record: &testing_framework_core::scenario::BlockRecord,
tracked_accounts: &HashSet<ZkPublicKey>,
observed: &AtomicU64,
) {
for tx in record.block.transactions() {
for note in &tx.mantle_tx().ledger_tx.outputs {
if tracked_accounts.contains(&note.pk) {
observed.fetch_add(1, Ordering::Relaxed);
tracing::debug!(pk = ?note.pk, "tx inclusion observed account output");
break;
}
}
}
}