From a1e0dddea3970610a902c68d18090057c4912586 Mon Sep 17 00:00:00 2001 From: andrussal Date: Thu, 18 Dec 2025 23:10:22 +0100 Subject: [PATCH] workflows: remove panics from workloads --- testing-framework/workflows/src/util/tx.rs | 11 ++++++---- .../workflows/src/workloads/chaos.rs | 18 +++++++++------- .../workflows/src/workloads/da/expectation.rs | 21 ++++++++++++------- .../workflows/src/workloads/da/workload.rs | 14 +++---------- .../src/workloads/transaction/workload.rs | 2 +- 5 files changed, 36 insertions(+), 30 deletions(-) diff --git a/testing-framework/workflows/src/util/tx.rs b/testing-framework/workflows/src/util/tx.rs index 4a3ade7..4f60210 100644 --- a/testing-framework/workflows/src/util/tx.rs +++ b/testing-framework/workflows/src/util/tx.rs @@ -4,11 +4,11 @@ use nomos_core::mantle::{ ledger::Tx as LedgerTx, ops::channel::{ChannelId, MsgId, inscribe::InscriptionOp}, }; +use testing_framework_core::scenario::DynError; /// Builds a signed inscription transaction with deterministic payload for /// testing. -#[must_use] -pub fn create_inscription_transaction_with_id(id: ChannelId) -> SignedMantleTx { +pub fn create_inscription_transaction_with_id(id: ChannelId) -> Result { let signing_key = Ed25519Key::from_bytes(&[0u8; 32]); let signer = signing_key.public_key(); @@ -31,10 +31,13 @@ pub fn create_inscription_transaction_with_id(id: ChannelId) -> SignedMantleTx { let zk_key = ZkKey::zero(); tracing::debug!(channel = ?id, tx_hash = ?tx_hash, "building inscription transaction"); + let zk_signature = ZkKey::multi_sign(&[zk_key], tx_hash.as_ref()) + .map_err(|err| format!("zk signature generation failed: {err}"))?; + SignedMantleTx::new( mantle_tx, vec![OpProof::Ed25519Sig(signature)], - ZkKey::multi_sign(&[zk_key], tx_hash.as_ref()).expect("zk signature generation"), + zk_signature, ) - .expect("valid transaction") + .map_err(|err| format!("failed to build signed mantle transaction: {err}").into()) } diff --git a/testing-framework/workflows/src/workloads/chaos.rs b/testing-framework/workflows/src/workloads/chaos.rs index 8e13120..7a3142f 100644 --- a/testing-framework/workflows/src/workloads/chaos.rs +++ b/testing-framework/workflows/src/workloads/chaos.rs @@ -93,7 +93,11 @@ impl RandomRestartWorkload { &self, targets: &[Target], cooldowns: &HashMap, - ) -> Target { + ) -> Result { + if targets.is_empty() { + return Err("chaos restart workload has no eligible targets".into()); + } + loop { let now = Instant::now(); if let Some(next_ready) = cooldowns @@ -121,13 +125,13 @@ impl RandomRestartWorkload { if let Some(choice) = available.choose(&mut thread_rng()).copied() { tracing::debug!(?choice, "chaos restart picked target"); - return choice; + return Ok(choice); } - return targets - .choose(&mut thread_rng()) - .copied() - .expect("chaos restart workload has targets"); + if let Some(choice) = targets.choose(&mut thread_rng()).copied() { + return Ok(choice); + } + return Err("chaos restart workload has no eligible targets".into()); } } } @@ -160,7 +164,7 @@ impl Workload for RandomRestartWorkload { loop { sleep(self.random_delay()).await; - let target = self.pick_target(&targets, &cooldowns).await; + let target = self.pick_target(&targets, &cooldowns).await?; match target { Target::Validator(index) => { diff --git a/testing-framework/workflows/src/workloads/da/expectation.rs b/testing-framework/workflows/src/workloads/da/expectation.rs index a340655..94ffec0 100644 --- a/testing-framework/workflows/src/workloads/da/expectation.rs +++ b/testing-framework/workflows/src/workloads/da/expectation.rs @@ -19,6 +19,16 @@ use tokio::{pin, select, spawn, sync::broadcast, time::sleep}; use super::workload::{planned_channel_count, planned_channel_ids}; +fn lock_or_recover<'a, T>(mutex: &'a Mutex, name: &'static str) -> std::sync::MutexGuard<'a, T> { + match mutex.lock() { + Ok(guard) => guard, + Err(poisoned) => { + tracing::warn!(lock = name, "mutex poisoned; recovering inner value"); + poisoned.into_inner() + } + } +} + #[derive(Debug)] pub struct DaWorkloadExpectation { blob_rate_per_block: NonZeroU64, @@ -277,15 +287,12 @@ impl DaWorkloadExpectation { } fn missing_inscriptions(&self, state: &CaptureState) -> Vec { - let inscriptions = state - .inscriptions - .lock() - .expect("inscription lock poisoned"); + let inscriptions = lock_or_recover(&state.inscriptions, "da_inscriptions"); missing_channels(&state.planned, &inscriptions) } fn observe_blobs(&self, state: &CaptureState) -> BlobObservation { - let blobs = state.blobs.lock().expect("blob lock poisoned"); + let blobs = lock_or_recover(&state.blobs, "da_blobs"); let observed_total_blobs = blobs.values().sum::(); let channels_with_blobs = blobs .iter() @@ -355,13 +362,13 @@ fn capture_block( } if !new_inscriptions.is_empty() { - let mut guard = inscriptions.lock().expect("inscription lock poisoned"); + let mut guard = lock_or_recover(inscriptions, "da_inscriptions"); guard.extend(new_inscriptions); tracing::debug!(count = guard.len(), "DA expectation captured inscriptions"); } if !new_blobs.is_empty() { - let mut guard = blobs.lock().expect("blob lock poisoned"); + let mut guard = lock_or_recover(blobs, "da_blobs"); for channel in new_blobs { let entry = guard.entry(channel).or_insert(0); *entry += 1; diff --git a/testing-framework/workflows/src/workloads/da/workload.rs b/testing-framework/workflows/src/workloads/da/workload.rs index 5f919aa..231809b 100644 --- a/testing-framework/workflows/src/workloads/da/workload.rs +++ b/testing-framework/workflows/src/workloads/da/workload.rs @@ -28,8 +28,6 @@ use crate::{ }; const TEST_KEY_BYTES: [u8; 32] = [0u8; 32]; -const DEFAULT_BLOB_RATE_PER_BLOCK: u64 = 1; -const DEFAULT_CHANNEL_RATE_PER_BLOCK: u64 = 1; const BLOB_CHUNK_OPTIONS: &[usize] = &[1, 2, 4, 8]; const PUBLISH_RETRIES: usize = 5; const PUBLISH_RETRY_DELAY: Duration = Duration::from_secs(2); @@ -44,11 +42,7 @@ pub struct Workload { impl Default for Workload { fn default() -> Self { - Self::with_rate( - NonZeroU64::new(DEFAULT_BLOB_RATE_PER_BLOCK).expect("non-zero"), - NonZeroU64::new(DEFAULT_CHANNEL_RATE_PER_BLOCK).expect("non-zero"), - DEFAULT_HEADROOM_PERCENT, - ) + Self::with_rate(NonZeroU64::MIN, NonZeroU64::MIN, DEFAULT_HEADROOM_PERCENT) } } @@ -139,7 +133,7 @@ async fn run_channel_flow( target_blobs: u64, ) -> Result<(), DynError> { tracing::debug!(channel_id = ?channel_id, "DA: submitting inscription tx"); - let inscription_tx = Arc::new(tx::create_inscription_transaction_with_id(channel_id)); + let inscription_tx = Arc::new(tx::create_inscription_transaction_with_id(channel_id)?); submit_transaction_via_cluster(ctx, Arc::clone(&inscription_tx)).await?; let mut receiver = ctx.block_feed().subscribe(); @@ -280,9 +274,7 @@ fn random_blob_payload() -> Vec { let mut rng = thread_rng(); // KZGRS encoder expects the polynomial degree to be a power of two, which // effectively constrains the blob chunk count. - let chunks = *BLOB_CHUNK_OPTIONS - .choose(&mut rng) - .expect("non-empty chunk options"); + let chunks = BLOB_CHUNK_OPTIONS.choose(&mut rng).copied().unwrap_or(1); let mut data = vec![0u8; 31 * chunks]; rng.fill_bytes(&mut data); data diff --git a/testing-framework/workflows/src/workloads/transaction/workload.rs b/testing-framework/workflows/src/workloads/transaction/workload.rs index 7d30fdb..4f8c1e2 100644 --- a/testing-framework/workflows/src/workloads/transaction/workload.rs +++ b/testing-framework/workflows/src/workloads/transaction/workload.rs @@ -157,7 +157,7 @@ impl Workload { impl Default for Workload { fn default() -> Self { - Self::new(NonZeroU64::new(1).expect("non-zero")) + Self::new(NonZeroU64::MIN) } }